| // Copyright (C) 2013 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.google.gerrit.lucene; |
| |
| import static com.google.common.collect.ImmutableList.toImmutableList; |
| import static com.google.gerrit.lucene.AbstractLuceneIndex.sortFieldName; |
| import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE; |
| import static com.google.gerrit.server.index.change.ChangeField.CHANGENUM_SPEC; |
| import static com.google.gerrit.server.index.change.ChangeField.NUMERIC_ID_STR_SPEC; |
| import static com.google.gerrit.server.index.change.ChangeField.PROJECT_SPEC; |
| import static com.google.gerrit.server.index.change.ChangeIndexRewriter.CLOSED_STATUSES; |
| import static com.google.gerrit.server.index.change.ChangeIndexRewriter.OPEN_STATUSES; |
| import static java.util.Objects.requireNonNull; |
| |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.collect.ListMultimap; |
| import com.google.common.collect.MultimapBuilder; |
| import com.google.common.collect.Sets; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.gerrit.common.Nullable; |
| import com.google.gerrit.entities.Change; |
| import com.google.gerrit.entities.Project; |
| import com.google.gerrit.entities.converter.ChangeProtoConverter; |
| import com.google.gerrit.entities.converter.ProtoConverter; |
| import com.google.gerrit.exceptions.StorageException; |
| import com.google.gerrit.index.PaginationType; |
| import com.google.gerrit.index.QueryOptions; |
| import com.google.gerrit.index.Schema; |
| import com.google.gerrit.index.SchemaFieldDefs.SchemaField; |
| import com.google.gerrit.index.query.FieldBundle; |
| import com.google.gerrit.index.query.HasCardinality; |
| import com.google.gerrit.index.query.Predicate; |
| import com.google.gerrit.index.query.QueryParseException; |
| import com.google.gerrit.index.query.ResultSet; |
| import com.google.gerrit.proto.Protos; |
| import com.google.gerrit.server.change.MergeabilityComputationBehavior; |
| import com.google.gerrit.server.config.GerritServerConfig; |
| import com.google.gerrit.server.config.SitePaths; |
| import com.google.gerrit.server.index.IndexExecutor; |
| import com.google.gerrit.server.index.IndexUtils; |
| import com.google.gerrit.server.index.change.ChangeField; |
| import com.google.gerrit.server.index.change.ChangeIndex; |
| import com.google.gerrit.server.index.change.ChangeIndexRewriter; |
| import com.google.gerrit.server.index.options.AutoFlush; |
| import com.google.gerrit.server.query.change.ChangeData; |
| import com.google.gerrit.server.query.change.ChangeDataSource; |
| import com.google.inject.Inject; |
| import com.google.inject.assistedinject.Assisted; |
| import com.google.protobuf.MessageLite; |
| import java.io.IOException; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.function.Function; |
| import org.apache.lucene.document.Document; |
| import org.apache.lucene.index.IndexWriter; |
| import org.apache.lucene.index.IndexableField; |
| import org.apache.lucene.index.Term; |
| import org.apache.lucene.search.IndexSearcher; |
| import org.apache.lucene.search.Query; |
| import org.apache.lucene.search.ScoreDoc; |
| import org.apache.lucene.search.SearcherFactory; |
| import org.apache.lucene.search.SearcherManager; |
| import org.apache.lucene.search.Sort; |
| import org.apache.lucene.search.SortField; |
| import org.apache.lucene.search.TopDocs; |
| import org.apache.lucene.search.TopFieldDocs; |
| import org.apache.lucene.store.ByteBuffersDirectory; |
| import org.apache.lucene.util.BytesRef; |
| import org.eclipse.jgit.lib.Config; |
| |
| /** |
| * Secondary index implementation using Apache Lucene. |
| * |
| * <p>Writes are managed using a single {@link IndexWriter} per process, committed aggressively. |
| * Reads use {@link SearcherManager} and periodically refresh, though there may be some lag between |
| * a committed write and it showing up to other threads' searchers. |
| */ |
| public class LuceneChangeIndex implements ChangeIndex { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| |
| static final String UPDATED_SORT_FIELD = sortFieldName(ChangeField.UPDATED_SPEC); |
| static final String MERGED_ON_SORT_FIELD = sortFieldName(ChangeField.MERGED_ON_SPEC); |
| static final String ID_STR_SORT_FIELD = sortFieldName(ChangeField.NUMERIC_ID_STR_SPEC); |
| |
| private static final String CHANGES = "changes"; |
| private static final String CHANGES_OPEN = "open"; |
| private static final String CHANGES_CLOSED = "closed"; |
| private static final String CHANGE_FIELD = ChangeField.CHANGE_SPEC.getName(); |
| |
| static Term idTerm(ChangeData cd) { |
| return idTerm(cd.virtualId()); |
| } |
| |
| static Term idTerm(Change.Id id) { |
| return QueryBuilder.stringTerm(NUMERIC_ID_STR_SPEC.getName(), Integer.toString(id.get())); |
| } |
| |
| private final ListeningExecutorService executor; |
| private final ChangeData.Factory changeDataFactory; |
| private final Schema<ChangeData> schema; |
| private final QueryBuilder<ChangeData> queryBuilder; |
| private final ChangeSubIndex openIndex; |
| private final ChangeSubIndex closedIndex; |
| private final ImmutableSet<String> skipFields; |
| |
| @Inject |
| LuceneChangeIndex( |
| @GerritServerConfig Config cfg, |
| SitePaths sitePaths, |
| @IndexExecutor(INTERACTIVE) ListeningExecutorService executor, |
| ChangeData.Factory changeDataFactory, |
| @Assisted Schema<ChangeData> schema, |
| AutoFlush autoFlush) |
| throws IOException { |
| this.executor = executor; |
| this.changeDataFactory = changeDataFactory; |
| this.schema = schema; |
| this.skipFields = |
| MergeabilityComputationBehavior.fromConfig(cfg).includeInIndex() |
| ? ImmutableSet.of() |
| : ImmutableSet.of(ChangeField.MERGEABLE_SPEC.getName()); |
| |
| GerritIndexWriterConfig openConfig = new GerritIndexWriterConfig(cfg, "changes_open"); |
| GerritIndexWriterConfig closedConfig = new GerritIndexWriterConfig(cfg, "changes_closed"); |
| |
| queryBuilder = new QueryBuilder<>(schema, openConfig.getAnalyzer()); |
| |
| SearcherFactory searcherFactory = new SearcherFactory(); |
| if (LuceneIndexModule.isInMemoryTest(cfg)) { |
| openIndex = |
| new ChangeSubIndex( |
| schema, |
| sitePaths, |
| new ByteBuffersDirectory(), |
| "ramOpen", |
| skipFields, |
| openConfig, |
| searcherFactory, |
| autoFlush); |
| closedIndex = |
| new ChangeSubIndex( |
| schema, |
| sitePaths, |
| new ByteBuffersDirectory(), |
| "ramClosed", |
| skipFields, |
| closedConfig, |
| searcherFactory, |
| autoFlush); |
| } else { |
| Path dir = LuceneVersionManager.getDir(sitePaths, CHANGES, schema); |
| openIndex = |
| new ChangeSubIndex( |
| schema, |
| sitePaths, |
| dir.resolve(CHANGES_OPEN), |
| skipFields, |
| openConfig, |
| searcherFactory, |
| autoFlush); |
| closedIndex = |
| new ChangeSubIndex( |
| schema, |
| sitePaths, |
| dir.resolve(CHANGES_CLOSED), |
| skipFields, |
| closedConfig, |
| searcherFactory, |
| autoFlush); |
| } |
| } |
| |
| @Override |
| public void close() { |
| try { |
| openIndex.close(); |
| } finally { |
| closedIndex.close(); |
| } |
| } |
| |
| @Override |
| public Schema<ChangeData> getSchema() { |
| return schema; |
| } |
| |
| @Override |
| public void replace(ChangeData cd) { |
| Term id = LuceneChangeIndex.idTerm(cd); |
| // toDocument is essentially static and doesn't depend on the specific |
| // sub-index, so just pick one. |
| Document doc = openIndex.toDocument(cd); |
| try { |
| if (cd.change().isNew()) { |
| Futures.allAsList(closedIndex.delete(id), openIndex.replace(id, doc)).get(); |
| } else { |
| Futures.allAsList(openIndex.delete(id), closedIndex.replace(id, doc)).get(); |
| } |
| } catch (ExecutionException | InterruptedException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| @Override |
| public void insert(ChangeData cd) { |
| // toDocument is essentially static and doesn't depend on the specific |
| // sub-index, so just pick one. |
| Document doc = openIndex.toDocument(cd); |
| try { |
| if (cd.change().isNew()) { |
| openIndex.insert(doc).get(); |
| } else { |
| closedIndex.insert(doc).get(); |
| } |
| } catch (ExecutionException | InterruptedException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| @Override |
| public void deleteByValue(ChangeData value) { |
| delete(ChangeIndex.ENTITY_TO_KEY.apply(value)); |
| } |
| |
| @Override |
| public void delete(Change.Id changeId) { |
| Term idTerm = LuceneChangeIndex.idTerm(changeId); |
| try { |
| Futures.allAsList(openIndex.delete(idTerm), closedIndex.delete(idTerm)).get(); |
| } catch (ExecutionException | InterruptedException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| @Override |
| public void deleteAll() { |
| openIndex.deleteAll(); |
| closedIndex.deleteAll(); |
| } |
| |
| @Override |
| public ChangeDataSource getSource(Predicate<ChangeData> p, QueryOptions opts) |
| throws QueryParseException { |
| Set<Change.Status> statuses = ChangeIndexRewriter.getPossibleStatus(p); |
| List<ChangeSubIndex> indexes = new ArrayList<>(2); |
| if (!Sets.intersection(statuses, OPEN_STATUSES).isEmpty()) { |
| indexes.add(openIndex); |
| } |
| if (!Sets.intersection(statuses, CLOSED_STATUSES).isEmpty()) { |
| indexes.add(closedIndex); |
| } |
| return new QuerySource(indexes, p, opts, getSort(), openIndex::toFieldBundle); |
| } |
| |
| @Override |
| public void markReady(boolean ready) { |
| // Arbitrary done on open index, as ready bit is set |
| // per index and not sub index |
| openIndex.markReady(ready); |
| } |
| |
| @Override |
| public boolean snapshot(String id) throws IOException { |
| return openIndex.snapshot(id) && closedIndex.snapshot(id); |
| } |
| |
| private Sort getSort() { |
| return new Sort( |
| new SortField(UPDATED_SORT_FIELD, SortField.Type.LONG, true), |
| new SortField(MERGED_ON_SORT_FIELD, SortField.Type.LONG, true), |
| new SortField(ID_STR_SORT_FIELD, SortField.Type.LONG, true)); |
| } |
| |
| private class QuerySource implements ChangeDataSource { |
| private final List<ChangeSubIndex> indexes; |
| private final Predicate<ChangeData> predicate; |
| private final Query query; |
| private final QueryOptions opts; |
| private final Sort sort; |
| private final Function<Document, FieldBundle> rawDocumentMapper; |
| private final boolean isSearchAfterPagination; |
| |
| private QuerySource( |
| List<ChangeSubIndex> indexes, |
| Predicate<ChangeData> predicate, |
| QueryOptions opts, |
| Sort sort, |
| Function<Document, FieldBundle> rawDocumentMapper) |
| throws QueryParseException { |
| this.indexes = indexes; |
| this.predicate = predicate; |
| this.query = requireNonNull(queryBuilder.toQuery(predicate), "null query from Lucene"); |
| this.opts = opts; |
| this.sort = sort; |
| this.rawDocumentMapper = rawDocumentMapper; |
| this.isSearchAfterPagination = |
| opts.config().paginationType().equals(PaginationType.SEARCH_AFTER); |
| } |
| |
| @Override |
| public int getCardinality() { |
| if (predicate instanceof HasCardinality) { |
| return ((HasCardinality) predicate).getCardinality(); |
| } |
| return 10; |
| } |
| |
| @Override |
| public boolean hasChange() { |
| return false; |
| } |
| |
| @Override |
| public String toString() { |
| return predicate.toString(); |
| } |
| |
| @Override |
| public ResultSet<ChangeData> read() { |
| if (Thread.interrupted()) { |
| Thread.currentThread().interrupt(); |
| throw new StorageException("interrupted"); |
| } |
| |
| final Set<String> fields = IndexUtils.changeFields(opts); |
| return new ChangeDataResults( |
| executor.submit( |
| new Callable<Results>() { |
| @Override |
| public Results call() throws IOException { |
| return doRead(fields); |
| } |
| |
| @Override |
| public String toString() { |
| return predicate.toString(); |
| } |
| }), |
| fields); |
| } |
| |
| @Override |
| public ResultSet<FieldBundle> readRaw() { |
| List<Document> documents; |
| Map<ChangeSubIndex, ScoreDoc> searchAfterBySubIndex; |
| |
| try { |
| Results r = doRead(IndexUtils.changeFields(opts)); |
| documents = r.docs; |
| searchAfterBySubIndex = r.searchAfterBySubIndex; |
| } catch (IOException e) { |
| throw new StorageException(e); |
| } |
| ImmutableList<FieldBundle> fieldBundles = |
| documents.stream().map(rawDocumentMapper).collect(toImmutableList()); |
| return new ResultSet<>() { |
| @Override |
| public Iterator<FieldBundle> iterator() { |
| return fieldBundles.iterator(); |
| } |
| |
| @Override |
| public ImmutableList<FieldBundle> toList() { |
| return fieldBundles; |
| } |
| |
| @Override |
| public void close() { |
| // Do nothing. |
| } |
| |
| @Override |
| public Object searchAfter() { |
| return searchAfterBySubIndex; |
| } |
| }; |
| } |
| |
| private Results doRead(Set<String> fields) throws IOException { |
| IndexSearcher[] searchers = new IndexSearcher[indexes.size()]; |
| Map<ChangeSubIndex, ScoreDoc> searchAfterBySubIndex = new HashMap<>(); |
| try { |
| int pageLimit = AbstractLuceneIndex.getLimitBasedOnPaginationType(opts, opts.pageSize()); |
| int queryLimit = opts.start() + pageLimit; |
| if (Integer.MAX_VALUE - pageLimit < opts.start()) { |
| queryLimit = Integer.MAX_VALUE; |
| } |
| List<TopFieldDocs> hits = new ArrayList<>(); |
| int searchAfterHitsCount = 0; |
| for (int i = 0; i < indexes.size(); i++) { |
| ChangeSubIndex subIndex = indexes.get(i); |
| searchers[i] = subIndex.acquire(); |
| if (isSearchAfterPagination) { |
| ScoreDoc searchAfter = getSearchAfter(subIndex); |
| int maxRemainingHits = queryLimit - searchAfterHitsCount; |
| if (maxRemainingHits > 0) { |
| TopFieldDocs subIndexHits = |
| searchers[i].searchAfter( |
| searchAfter, query, maxRemainingHits, sort, /* doDocScores= */ false); |
| assignShardIndexValues(subIndexHits, i); |
| searchAfterHitsCount += subIndexHits.scoreDocs.length; |
| hits.add(subIndexHits); |
| searchAfterBySubIndex.put( |
| subIndex, Iterables.getLast(Arrays.asList(subIndexHits.scoreDocs), searchAfter)); |
| } |
| } else { |
| TopFieldDocs subIndexHits = searchers[i].search(query, queryLimit, sort); |
| assignShardIndexValues(subIndexHits, i); |
| hits.add(subIndexHits); |
| } |
| } |
| TopDocs docs = TopDocs.merge(sort, queryLimit, hits.stream().toArray(TopFieldDocs[]::new)); |
| |
| List<Document> result = new ArrayList<>(docs.scoreDocs.length); |
| for (int i = opts.start(); i < docs.scoreDocs.length; i++) { |
| ScoreDoc sd = docs.scoreDocs[i]; |
| result.add(searchers[sd.shardIndex].doc(sd.doc, fields)); |
| } |
| return new Results(result, searchAfterBySubIndex); |
| } finally { |
| for (int i = 0; i < indexes.size(); i++) { |
| if (searchers[i] != null) { |
| try { |
| indexes.get(i).release(searchers[i]); |
| } catch (IOException e) { |
| logger.atWarning().withCause(e).log("cannot release Lucene searcher"); |
| } |
| } |
| } |
| } |
| } |
| |
| /* |
| * Assign shard index values to the score documents. |
| * |
| * <p>TopDocs.merge()'s API has been changed to stop allowing passing in a parameter to |
| * indicate if it should set shard indices for hits as they are seen during the merge |
| * process. This is done to simplify the API to be more dynamic in terms of passing in |
| * custom tie breakers. If shard indices are to be used for tie breaking docs with equal |
| * scores during TopDocs.merge(), then it is mandatory that the input ScoreDocs have their |
| * shard indices set to valid values prior to calling merge(). |
| * |
| * @param doc document |
| * @param shard index |
| */ |
| private void assignShardIndexValues(TopFieldDocs doc, int shard) { |
| for (int docID = 0; docID < doc.scoreDocs.length; docID++) { |
| doc.scoreDocs[docID].shardIndex = shard; |
| } |
| } |
| |
| /** |
| * Returns null for the first page or when pagination type is not {@link |
| * PaginationType#SEARCH_AFTER search-after}, otherwise returns the last doc from previous |
| * search on the given change sub-index. |
| * |
| * @param subIndex change sub-index |
| * @return the score doc that can be used to page result sets |
| */ |
| @Nullable |
| private ScoreDoc getSearchAfter(ChangeSubIndex subIndex) { |
| if (isSearchAfterPagination |
| && opts.searchAfter() != null |
| && opts.searchAfter() instanceof Map |
| && ((Map<?, ?>) opts.searchAfter()).get(subIndex) instanceof ScoreDoc) { |
| return (ScoreDoc) ((Map<?, ?>) opts.searchAfter()).get(subIndex); |
| } |
| return null; |
| } |
| } |
| |
| private static class Results { |
| List<Document> docs; |
| Map<ChangeSubIndex, ScoreDoc> searchAfterBySubIndex; |
| |
| public Results(List<Document> docs, Map<ChangeSubIndex, ScoreDoc> searchAfterBySubIndex) { |
| this.docs = docs; |
| this.searchAfterBySubIndex = searchAfterBySubIndex; |
| } |
| } |
| |
| private class ChangeDataResults implements ResultSet<ChangeData> { |
| private final Future<Results> future; |
| private final Set<String> fields; |
| private Map<ChangeSubIndex, ScoreDoc> searchAfterBySubIndex; |
| |
| ChangeDataResults(Future<Results> future, Set<String> fields) { |
| this.future = future; |
| this.fields = fields; |
| } |
| |
| @Override |
| public Iterator<ChangeData> iterator() { |
| return toList().iterator(); |
| } |
| |
| @Override |
| public ImmutableList<ChangeData> toList() { |
| try { |
| Results r = future.get(); |
| List<Document> docs = r.docs; |
| searchAfterBySubIndex = r.searchAfterBySubIndex; |
| ImmutableList.Builder<ChangeData> result = |
| ImmutableList.builderWithExpectedSize(docs.size()); |
| for (Document doc : docs) { |
| String fieldName = |
| doc.getField(CHANGENUM_SPEC.getName()) != null |
| ? CHANGENUM_SPEC.getName() |
| : NUMERIC_ID_STR_SPEC.getName(); |
| result.add(toChangeData(fields(doc, fields), fields, fieldName)); |
| } |
| return result.build(); |
| } catch (InterruptedException e) { |
| close(); |
| throw new StorageException(e); |
| } catch (ExecutionException e) { |
| Throwables.throwIfUnchecked(e.getCause()); |
| throw new StorageException(e.getCause()); |
| } |
| } |
| |
| @Override |
| public void close() { |
| future.cancel(false /* do not interrupt Lucene */); |
| } |
| |
| @Override |
| public Object searchAfter() { |
| return searchAfterBySubIndex; |
| } |
| } |
| |
| private static ListMultimap<String, IndexableField> fields(Document doc, Set<String> fields) { |
| ListMultimap<String, IndexableField> stored = |
| MultimapBuilder.hashKeys(fields.size()).arrayListValues(4).build(); |
| for (IndexableField f : doc) { |
| String name = f.name(); |
| if (fields.contains(name)) { |
| stored.put(name, f); |
| } |
| } |
| return stored; |
| } |
| |
| private ChangeData toChangeData( |
| ListMultimap<String, IndexableField> doc, Set<String> fields, String idFieldName) { |
| ChangeData cd; |
| // Either change or the ID field was guaranteed to be included in the call |
| // to fields() above. |
| IndexableField cb = Iterables.getFirst(doc.get(CHANGE_FIELD), null); |
| if (cb != null) { |
| BytesRef proto = cb.binaryValue(); |
| // pass the id field value (which is the change virtual id for the imported changes) when |
| // available |
| IndexableField f = Iterables.getFirst(doc.get(idFieldName), null); |
| cd = |
| changeDataFactory.create( |
| parseProtoFrom(proto, ChangeProtoConverter.INSTANCE), |
| f != null ? Change.id(Integer.valueOf(f.stringValue())) : null); |
| } else { |
| IndexableField f = Iterables.getFirst(doc.get(idFieldName), null); |
| |
| Change.Id id = Change.id(Integer.valueOf(f.stringValue())); |
| // IndexUtils#changeFields ensures either CHANGE or PROJECT is always present. |
| IndexableField project = doc.get(PROJECT_SPEC.getName()).iterator().next(); |
| cd = changeDataFactory.create(Project.nameKey(project.stringValue()), id); |
| } |
| |
| for (SchemaField<ChangeData, ?> field : getSchema().getSchemaFields().values()) { |
| if (fields.contains(field.getName())) { |
| @SuppressWarnings("unused") |
| var unused = field.setIfPossible(cd, new LuceneStoredValue(doc.get(field.getName()))); |
| } |
| } |
| return cd; |
| } |
| |
| private static <P extends MessageLite, T> T parseProtoFrom( |
| BytesRef bytesRef, ProtoConverter<P, T> converter) { |
| P message = |
| Protos.parseUnchecked( |
| converter.getParser(), bytesRef.bytes, bytesRef.offset, bytesRef.length); |
| return converter.fromProto(message); |
| } |
| } |