blob: d834034114b18b4fa6da156fcc3ce473cfa7475c [file] [log] [blame]
/*
* Copyright 2012-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.facebook.buck.rules;
import com.facebook.buck.event.AbstractBuckEvent;
import com.facebook.buck.event.BuckEvent;
import com.facebook.buck.event.BuckEventBus;
import com.facebook.buck.event.ConsoleEvent;
import com.facebook.buck.event.ThrowableConsoleEvent;
import com.facebook.buck.util.FileHashCache;
import com.facebook.buck.util.HumanReadableException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.google.common.hash.HashCode;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.NotSerializableException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
public class CassandraArtifactCache implements ArtifactCache {
/**
* If the user is offline, then we do not want to print every connection failure that occurs.
* However, in practice, it appears that some connection failures can be intermittent, so we
* should print enough to provide a signal of how flaky the connection is.
*/
private static final int MAX_CONNECTION_FAILURE_REPORTS = 10;
private static final String POOL_NAME = "ArtifactCachePool";
private static final String CLUSTER_NAME = "BuckCacheCluster";
private static final String KEYSPACE_NAME = "Buck";
private static final String CONFIGURATION_COLUMN_FAMILY_NAME = "Configuration";
private static final String CONFIGURATION_MAGIC_KEY = "magic";
private static final String CONFIGURATION_MAGIC_VALUE = "Buck artifact cache";
private static final String CONFIGURATION_TTL_KEY = "ttl";
private static final String CONFIGURATION_COLUMN_NAME = "value";
private static final ColumnFamily<String, String> CF_CONFIG = new ColumnFamily<String, String>(
CONFIGURATION_COLUMN_FAMILY_NAME,
StringSerializer.get(),
StringSerializer.get());
private static final String ARTIFACT_COLUMN_FAMILY_NAME = "Artifacts";
private static final String ARTIFACT_COLUMN_NAME = "artifact";
private static final ColumnFamily<String, String> CF_ARTIFACT = new ColumnFamily<String, String>(
ARTIFACT_COLUMN_FAMILY_NAME,
StringSerializer.get(),
StringSerializer.get());
private final AstyanaxContext<Keyspace> context;
private static final class KeyspaceAndTtl {
private final Keyspace keyspace;
private final int ttl;
private Keyspace getKeyspace() {
return keyspace;
}
private int getTtl() {
return ttl;
}
private KeyspaceAndTtl(Keyspace keyspace, int ttl) {
this.keyspace = keyspace;
this.ttl = ttl;
}
}
private final int timeoutSeconds;
private final Future<KeyspaceAndTtl> keyspaceAndTtlFuture;
private final AtomicInteger numConnectionExceptionReports;
private final boolean doStore;
private final BuckEventBus buckEventBus;
private final FileHashCache fileHashCache;
private final Set<ListenableFuture<OperationResult<Void>>> futures;
private final AtomicBoolean isWaitingToClose;
private final AtomicBoolean isKilled;
public CassandraArtifactCache(
String hosts,
int port,
int timeoutSeconds,
boolean doStore,
BuckEventBus buckEventBus,
FileHashCache fileHashCache)
throws ConnectionException {
this(timeoutSeconds, doStore, buckEventBus, fileHashCache, new AstyanaxContext.Builder()
.forCluster(CLUSTER_NAME)
.forKeyspace(KEYSPACE_NAME)
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setCqlVersion("3.0.0")
.setTargetCassandraVersion("1.2")
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
)
.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl(POOL_NAME)
.setSeeds(hosts)
.setPort(port)
.setMaxConnsPerHost(1)
)
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance()));
}
@VisibleForTesting
CassandraArtifactCache(
int timeoutSeconds,
boolean doStore,
BuckEventBus buckEventBus,
FileHashCache fileHashCache,
final AstyanaxContext<Keyspace> context) {
this.doStore = doStore;
this.buckEventBus = buckEventBus;
this.fileHashCache = fileHashCache;
this.numConnectionExceptionReports = new AtomicInteger(0);
this.timeoutSeconds = timeoutSeconds;
this.context = context;
ExecutorService connectionService = MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(1), 0, TimeUnit.SECONDS);
this.keyspaceAndTtlFuture = connectionService.submit(new Callable<KeyspaceAndTtl>() {
@Override
public KeyspaceAndTtl call() throws ConnectionException {
context.start();
Keyspace keyspace = context.getClient();
try {
verifyMagic(keyspace);
int ttl = getTtl(keyspace);
return new KeyspaceAndTtl(keyspace, ttl);
} catch (ConnectionException e) {
reportConnectionFailure("Attempting to get keyspace and ttl from server.", e);
throw e;
}
}
});
this.futures = Sets.newSetFromMap(
new ConcurrentHashMap<ListenableFuture<OperationResult<Void>>, Boolean>());
this.isWaitingToClose = new AtomicBoolean(false);
this.isKilled = new AtomicBoolean(false);
}
private static void verifyMagic(Keyspace keyspace) throws ConnectionException {
OperationResult<ColumnList<String>> result;
try {
result = keyspace.prepareQuery(CF_CONFIG)
.getKey(CONFIGURATION_MAGIC_KEY)
.execute();
} catch (BadRequestException e) {
throw new HumanReadableException("Artifact cache error during schema verification: %s",
e.getMessage());
}
Column<String> column = result.getResult().getColumnByName(CONFIGURATION_COLUMN_NAME);
if (column == null || !column.getStringValue().equals(CONFIGURATION_MAGIC_VALUE)) {
throw new HumanReadableException("Artifact cache schema mismatch");
}
}
/**
* @return The resulting keyspace and ttl of connecting to Cassandra if the connection succeeded,
* otherwise Optional.absent(). This method will block until connection finishes.
*/
private Optional<KeyspaceAndTtl> getKeyspaceAndTtl()
throws InterruptedException {
if (isKilled.get()) {
return Optional.absent();
}
try {
return Optional.of(keyspaceAndTtlFuture.get(timeoutSeconds, TimeUnit.SECONDS));
} catch (TimeoutException e) {
keyspaceAndTtlFuture.cancel(true);
isKilled.set(true);
} catch (ExecutionException e) {
if (!(e.getCause() instanceof ConnectionException)) {
buckEventBus.post(
ThrowableConsoleEvent.create(
e,
"Unexpected error when fetching keyspace and ttl: %s.",
e.getMessage()));
}
} catch (CancellationException e) {
return Optional.absent();
}
return Optional.absent();
}
private static int getTtl(Keyspace keyspace) throws ConnectionException {
OperationResult<ColumnList<String>> result = keyspace.prepareQuery(CF_CONFIG)
.getKey(CONFIGURATION_TTL_KEY)
.execute();
Column<String> column = result.getResult().getColumnByName(CONFIGURATION_COLUMN_NAME);
if (column == null) {
throw new HumanReadableException("Artifact cache schema malformation.");
}
try {
return Integer.parseInt(column.getStringValue());
} catch (NumberFormatException e) {
throw new HumanReadableException("Artifact cache ttl malformation: \"%s\".",
column.getStringValue());
}
}
@Override
public CacheResult fetch(RuleKey ruleKey, File output)
throws InterruptedException {
Optional<KeyspaceAndTtl> keyspaceAndTtl = getKeyspaceAndTtl();
if (!keyspaceAndTtl.isPresent()) {
// Connecting to Cassandra failed, return false
return CacheResult.MISS;
}
// Execute the query to Cassandra.
OperationResult<ColumnList<String>> result;
int ttl;
try {
Keyspace keyspace = keyspaceAndTtl.get().getKeyspace();
ttl = keyspaceAndTtl.get().getTtl();
result = keyspace.prepareQuery(CF_ARTIFACT)
.getKey(ruleKey.toString())
.execute();
} catch (ConnectionException e) {
reportConnectionFailure("Attempting to fetch " + ruleKey + ".", e);
return CacheResult.MISS;
}
CacheResult success = CacheResult.MISS;
try {
Column<String> column = result.getResult().getColumnByName(ARTIFACT_COLUMN_NAME);
if (column != null) {
ByteArrayInputStream dataStream = new ByteArrayInputStream(column.getByteArrayValue());
// Setup an object input stream to deserialize the hash code.
try (ObjectInputStream objectStream = new ObjectInputStream(dataStream)) {
// Deserialize the expected hash code object from the front of the artifact.
HashCode expectedHashCode;
try {
expectedHashCode = (HashCode) objectStream.readObject();
} catch (ClassNotFoundException | ClassCastException e) {
buckEventBus.post(
ThrowableConsoleEvent.create(
e,
"Could not deserialize artifact checksum from %s:%s.",
ruleKey,
output.getPath()));
return CacheResult.MISS;
}
// Write the contents to a temp file that sits next to the real destination.
Path path = output.toPath();
Files.createDirectories(path.getParent());
Path temp = Files.createTempFile(path.getParent(), path.getFileName().toString(), ".tmp");
Files.copy(dataStream, temp, StandardCopyOption.REPLACE_EXISTING);
// Compare the embedded hash code with the one we calculated here. If they don't match,
// discard the output and report a mismatch event.
HashCode actualHashCode = fileHashCache.get(temp);
if (!expectedHashCode.equals(actualHashCode)) {
buckEventBus.post(new CassandraChecksumMismatchEvent(expectedHashCode, actualHashCode));
Files.delete(temp);
return CacheResult.MISS;
}
// Finally, move the temp file into it's final place.
Files.move(temp, path, StandardCopyOption.REPLACE_EXISTING);
}
// Cassandra timestamps use microsecond resolution.
if (System.currentTimeMillis() * 1000L - column.getTimestamp() > ttl * 1000000L / 2L) {
// The cache entry has lived for more than half of its total TTL, so rewrite it in order
// to reset the TTL.
store(ruleKey, output);
}
success = CacheResult.CASSANDRA_HIT;
}
} catch (IOException e) {
buckEventBus.post(ThrowableConsoleEvent.create(e,
"Artifact was fetched but could not be written: %s at %s.",
ruleKey,
output.getPath()));
}
buckEventBus.post(ConsoleEvent.fine("Artifact fetch(%s, %s) cache %s",
ruleKey,
output.getPath(),
(success.isSuccess() ? "hit" : "miss")));
return success;
}
@Override
public void store(RuleKey ruleKey, File output) throws InterruptedException {
if (!isStoreSupported()) {
return;
}
Optional<KeyspaceAndTtl> keyspaceAndTtl = getKeyspaceAndTtl();
if (!keyspaceAndTtl.isPresent()) {
return;
}
try {
// Prepare a byte stream to stage the data we're storing.
ByteArrayOutputStream dataStream = new ByteArrayOutputStream();
// Setup an object output stream wrapper to serialize the hash code.
try (ObjectOutputStream objectStream = new ObjectOutputStream(dataStream)) {
// Store the hash code at the beginning of the data we're storing.
HashCode hashCode = fileHashCache.get(output.toPath());
try {
objectStream.writeObject(hashCode);
objectStream.flush();
} catch (NotSerializableException e) {
buckEventBus.post(
ThrowableConsoleEvent.create(
e,
"Artifact store(%s, %s) error: %s",
ruleKey,
output.getPath()));
return;
}
// The rest of the data is the contents of the artifact.
Files.copy(output.toPath(), dataStream);
Keyspace keyspace = keyspaceAndTtl.get().getKeyspace();
int ttl = keyspaceAndTtl.get().getTtl();
MutationBatch mutationBatch = keyspace.prepareMutationBatch();
mutationBatch.withRow(CF_ARTIFACT, ruleKey.toString())
.setDefaultTtl(ttl)
.putColumn(ARTIFACT_COLUMN_NAME, dataStream.toByteArray());
ListenableFuture<OperationResult<Void>> mutationFuture = mutationBatch.executeAsync();
trackFuture(mutationFuture);
}
} catch (ConnectionException e) {
reportConnectionFailure("Attempting to store " + ruleKey + ".", e);
} catch (IOException | OutOfMemoryError e) {
buckEventBus.post(ThrowableConsoleEvent.create(e,
"Artifact store(%s, %s) error: %s",
ruleKey,
output.getPath()));
}
}
private void trackFuture(final ListenableFuture<OperationResult<Void>> future) {
futures.add(future);
Futures.addCallback(future, new FutureCallback<OperationResult<Void>>() {
@Override
public void onSuccess(OperationResult<Void> result) {
removeFuture();
}
@Override
public void onFailure(Throwable t) {
removeFuture();
}
private void removeFuture() {
if (!isWaitingToClose.get()) {
futures.remove(future);
}
}
});
}
@Override
@SuppressWarnings("PMD.EmptyCatchBlock")
public void close() {
isWaitingToClose.set(true);
ListenableFuture<List<OperationResult<Void>>> future = Futures.allAsList(futures);
try {
future.get();
} catch (ExecutionException e) {
// Swallow exception and move on.
} catch (InterruptedException e) {
try {
future.cancel(true);
} catch (CancellationException ignored) {
// ListenableFuture may throw when its future is cancelled.
}
Thread.currentThread().interrupt();
return;
} finally {
context.shutdown();
}
}
@Override
public boolean isStoreSupported() {
return doStore;
}
private void reportConnectionFailure(String context, ConnectionException exception) {
if (numConnectionExceptionReports.incrementAndGet() < MAX_CONNECTION_FAILURE_REPORTS) {
buckEventBus.post(new CassandraConnectionExceptionEvent(
exception,
String.format(
"%s Connecting to cassandra failed: %s.",
context,
exception.getMessage())));
}
}
public static class CassandraConnectionExceptionEvent extends ThrowableConsoleEvent {
public CassandraConnectionExceptionEvent(Throwable throwable, String message) {
super(throwable, Level.WARNING, message);
}
}
public static class CassandraChecksumMismatchEvent extends AbstractBuckEvent {
private final HashCode expected;
private final HashCode actual;
public CassandraChecksumMismatchEvent(HashCode expected, HashCode actual) {
this.expected = expected;
this.actual = actual;
}
@Override
protected String getValueString() {
return String.format(
"Checksum mismatch: %s (expected) != %s (actual)",
expected,
actual);
}
@Override
public boolean isRelatedTo(BuckEvent event) {
return false;
}
@Override
public String getEventName() {
return "CassandraChecksumMismatchEvent";
}
public HashCode getExpected() {
return expected;
}
public HashCode getActual() {
return actual;
}
}
}