blob: 01a67850beda37f31c32d110fc96de9473af304d [file] [log] [blame]
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.modules.cache.pg;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicMap;
import com.google.gerrit.server.cache.CacheBinding;
import com.google.gerrit.server.cache.MemoryCacheFactory;
import com.google.gerrit.server.cache.PersistentCacheFactory;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class PgCacheFactory implements PersistentCacheFactory, LifecycleListener {
private static final Logger log = LoggerFactory.getLogger(PgCacheFactory.class);
private final PgSqlSource ds;
private final Config cfg;
private final MemoryCacheFactory memFactory;
private final List<PgCacheImpl<?, ?>> caches;
private final DynamicMap<Cache<?, ?>> cacheMap;
private final ExecutorService executor;
private final ScheduledExecutorService cleanup;
@Inject
PgCacheFactory(
PgSqlSource ds,
@GerritServerConfig Config cfg,
MemoryCacheFactory memFactory,
DynamicMap<Cache<?, ?>> cacheMap) {
this.ds = ds;
this.cfg = cfg;
this.memFactory = memFactory;
this.caches = new LinkedList<>();
this.cacheMap = cacheMap;
executor =
Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("PgCache-Store-%d").build());
cleanup =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setNameFormat("PgCache-Prune-%d").setDaemon(true).build());
}
@Override
public void start() {
if (executor != null) {
for (PgCacheImpl<?, ?> cache : caches) {
executor.execute(cache::start);
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
cleanup.schedule(() -> cache.prune(cleanup), 30, TimeUnit.SECONDS);
}
}
}
@Override
public void stop() {
if (executor != null) {
try {
cleanup.shutdownNow();
List<Runnable> pending = executor.shutdownNow();
if (executor.awaitTermination(15, TimeUnit.MINUTES)) {
if (pending != null && !pending.isEmpty()) {
log.info("Finishing {} disk cache updates", pending.size());
for (Runnable update : pending) {
update.run();
}
}
} else {
log.info("Timeout waiting for disk cache to close");
}
} catch (InterruptedException e) {
log.warn("Interrupted waiting for disk cache to shutdown");
}
}
synchronized (caches) {
for (PgCacheImpl<?, ?> cache : caches) {
cache.stop();
}
}
}
@SuppressWarnings({"unchecked"})
@Override
public <K, V> Cache<K, V> build(CacheBinding<K, V> in) {
long limit = cfg.getLong("cache", in.name(), "diskLimit", in.diskLimit());
if (limit <= 0) {
return memFactory.build(in);
}
PgCacheBindingProxy<K, V> def = new PgCacheBindingProxy<>(in);
PgSqlStore<K, V> store =
newSqlStore(def.name(), def.keyType(), limit, def.expireAfterWrite(TimeUnit.SECONDS));
PgCacheImpl<K, V> cache =
new PgCacheImpl<>(
def.name(),
executor,
store,
def.keyType(),
(Cache<K, ValueHolder<V>>) memFactory.build(def));
synchronized (caches) {
caches.add(cache);
}
return cache;
}
@SuppressWarnings({"unchecked"})
@Override
public <K, V> LoadingCache<K, V> build(CacheBinding<K, V> in, CacheLoader<K, V> loader) {
long limit = cfg.getLong("cache", in.name(), "diskLimit", in.diskLimit());
if (limit <= 0) {
return memFactory.build(in, loader);
}
PgCacheBindingProxy<K, V> def = new PgCacheBindingProxy<>(in);
PgSqlStore<K, V> store =
newSqlStore(def.name(), def.keyType(), limit, def.expireAfterWrite(TimeUnit.SECONDS));
Cache<K, ValueHolder<V>> mem =
(Cache<K, ValueHolder<V>>)
memFactory.build(def, (CacheLoader<K, V>) new PgCacheLoader<>(executor, store, loader));
PgCacheImpl<K, V> cache = new PgCacheImpl<>(def.name(), executor, store, def.keyType(), mem);
caches.add(cache);
return cache;
}
@Override
public void onStop(String plugin) {
synchronized (caches) {
for (Map.Entry<String, Provider<Cache<?, ?>>> entry : cacheMap.byPlugin(plugin).entrySet()) {
Cache<?, ?> cache = entry.getValue().get();
if (caches.remove(cache)) {
((PgCacheImpl<?, ?>) cache).stop();
}
}
}
}
private <V, K> PgSqlStore<K, V> newSqlStore(
String name, TypeLiteral<K> keyType, long maxSize, Long expireAfterWrite) {
return new PgSqlStore<>(
ds, name, keyType, maxSize, expireAfterWrite == null ? 0 : expireAfterWrite.longValue());
}
}