blob: af4c191f4b546a68cf14fe6667c7c3c6e7683a61 [file] [log] [blame]
/*
* Copyright 2013-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.facebook.buck.util;
import com.facebook.buck.io.MorePaths;
import com.facebook.buck.io.ProjectFilesystem;
import com.facebook.buck.log.Logger;
import com.facebook.buck.timing.Clock;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* A ProjectFilesystemWatcher implementation that uses a local watchman service.
*/
public class WatchmanWatcher implements ProjectFilesystemWatcher {
private static final Logger LOG = Logger.get(WatchmanWatcher.class);
private static final int DEFAULT_OVERFLOW_THRESHOLD = 10000;
private static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
private final Supplier<Process> watchmanProcessSupplier;
private final EventBus eventBus;
private final Clock clock;
private final ObjectMapper objectMapper;
private final String query;
/**
* The maximum number of watchman changes to process in each call to postEvents before
* giving up and generating an overflow. The goal is to be able to process a reasonable
* number of human generated changes quickly, but not spend a long time processing lots
* of changes after a branch switch which will end up invalidating the entire cache
* anyway. If overflow is negative calls to postEvents will just generate a single
* overflow event.
*/
private final int overflow;
private final long timeoutMillis;
public WatchmanWatcher(ProjectFilesystem filesystem,
EventBus fileChangeEventBus,
Clock clock,
ObjectMapper objectMapper,
Iterable<Path> ignorePaths,
Iterable<String> ignoreGlobs) {
this(createProcessSupplier(),
fileChangeEventBus,
clock,
objectMapper,
DEFAULT_OVERFLOW_THRESHOLD,
DEFAULT_TIMEOUT_MILLIS,
createQuery(
objectMapper,
MorePaths.absolutify(filesystem.getRootPath()).toString(),
UUID.randomUUID().toString(),
ignorePaths,
ignoreGlobs));
}
@VisibleForTesting
WatchmanWatcher(Supplier<Process> processSupplier,
EventBus fileChangeEventBus,
Clock clock,
ObjectMapper objectMapper,
int overflow,
long timeoutMillis,
String query) {
this.watchmanProcessSupplier = processSupplier;
this.eventBus = fileChangeEventBus;
this.clock = clock;
this.objectMapper = objectMapper;
this.overflow = overflow;
this.timeoutMillis = timeoutMillis;
this.query = query;
}
@VisibleForTesting
static String createQuery(
ObjectMapper objectMapper,
String rootPath,
String uuid,
Iterable<Path> ignorePaths,
Iterable<String> ignoreGlobs) {
List<Object> queryParams = new ArrayList<>();
queryParams.add("query");
queryParams.add(rootPath);
// Note that we use LinkedHashMap so insertion order is preserved. That
// helps us write tests that don't depend on the undefined order of HashMap.
Map<String, Object> sinceParams = new LinkedHashMap<>();
sinceParams.put(
"since",
new StringBuilder("n:buckd").append(uuid).toString());
// Exclude any expressions added to this list.
List<Object> excludeAnyOf = Lists.<Object>newArrayList("anyof");
// Exclude all directories.
excludeAnyOf.add(Lists.newArrayList("type", "d"));
// Exclude all files under directories in project.ignorePaths.
//
// Note that it's OK to exclude .git in a query (event though it's
// not currently OK to exclude .git in .watchmanconfig). This id
// because watchman's .git cookie magic is done before the query
// is applied.
for (Path ignorePath : ignorePaths) {
excludeAnyOf.add(
Lists.newArrayList(
"match",
ignorePath.toString() + "/*",
"wholename"));
}
// Exclude all files matching globs in project.ignoreGlobs.
for (String ignoreGlob : ignoreGlobs) {
excludeAnyOf.add(
Lists.newArrayList(
"match",
ignoreGlob,
"wholename"));
}
sinceParams.put(
"expression",
Lists.newArrayList(
"not",
excludeAnyOf));
sinceParams.put("empty_on_fresh_instance", true);
sinceParams.put("fields", Lists.newArrayList("name", "exists", "new"));
queryParams.add(sinceParams);
try {
return objectMapper.writeValueAsString(queryParams);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
private static Supplier<Process> createProcessSupplier() {
final ProcessBuilder processBuilder = new ProcessBuilder(
"watchman",
"--server-encoding=json",
"--no-pretty",
"-j");
return new Supplier<Process>() {
@Override
public Process get() {
try {
LOG.debug("Starting watchman command: %s", processBuilder.command());
return processBuilder.start();
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
};
}
/**
* Query Watchman for file change events. If too many events are pending or an error occurs
* an overflow event is posted to the EventBus signalling that events may have been lost
* (and so typically caches must be cleared to avoid inconsistency). Interruptions and
* IOExceptions are propagated to callers, but typically if overflow events are handled
* conservatively by subscribers then no other remedial action is required.
*/
@Override
public void postEvents() throws IOException, InterruptedException {
Process watchmanProcess = watchmanProcessSupplier.get();
try {
LOG.debug("Writing query to Watchman: %s", query);
watchmanProcess.getOutputStream().write(query.getBytes(Charsets.US_ASCII));
watchmanProcess.getOutputStream().close();
LOG.debug("Parsing JSON output from Watchman");
final long parseStartTimeMillis = clock.currentTimeMillis();
InputStream jsonInput = watchmanProcess.getInputStream();
if (LOG.isVerboseEnabled()) {
byte[] fullResponse = ByteStreams.toByteArray(jsonInput);
jsonInput.close();
jsonInput = new ByteArrayInputStream(fullResponse);
LOG.verbose("Full JSON: " + new String(fullResponse, Charsets.UTF_8).trim());
}
JsonParser jsonParser = objectMapper.getJsonFactory().createJsonParser(jsonInput);
PathEventBuilder builder = new PathEventBuilder();
JsonToken token = jsonParser.nextToken();
/*
* Watchman returns changes as an array of JSON objects with potentially unstable key
* ordering:
* {
* "files": [
* {
* "new": false,
* "exists": true,
* "name": "bin/buckd",
* },
* ]
* }
* A simple way to parse these changes is to collect the relevant values from each object
* in a builder and then build an event when the end of a JSON object is reached. When the end
* of the enclosing JSON object is processed the builder will not contain a complete event, so
* the object end token will be ignored.
*/
int eventCount = 0;
while (token != null) {
boolean shouldOverflow = false;
if (eventCount > overflow) {
LOG.warn(
"Received too many events from Watchmen (%d > overflow max %d), posting overflow " +
"event and giving up.",
eventCount,
overflow);
shouldOverflow = true;
} else {
long elapsedMillis = clock.currentTimeMillis() - parseStartTimeMillis;
if (elapsedMillis >= timeoutMillis) {
LOG.warn(
"Parsing took too long (timeout %d ms), posting overflow event and giving up.",
timeoutMillis);
shouldOverflow = true;
}
}
if (shouldOverflow) {
postWatchEvent(createOverflowEvent());
watchmanProcess.destroy();
return;
}
switch (token) {
case FIELD_NAME:
String fieldName = jsonParser.getCurrentName();
switch (fieldName) {
case "is_fresh_instance":
// Force caches to be invalidated --- we have no idea what's happening.
Boolean newInstance = jsonParser.nextBooleanValue();
if (newInstance) {
LOG.info("Fresh watchman instance detected. " +
"Posting overflow event to flush caches.");
postWatchEvent(createOverflowEvent());
}
break;
case "name":
builder.setPath(Paths.get(jsonParser.nextTextValue()));
break;
case "new":
if (jsonParser.nextBooleanValue()) {
builder.setCreationEvent();
}
break;
case "exists":
if (!jsonParser.nextBooleanValue()) {
builder.setDeletionEvent();
}
break;
case "error":
WatchmanWatcherException e = new WatchmanWatcherException(
jsonParser.nextTextValue());
LOG.error(e, "Error in Watchman output");
throw e;
}
break;
case END_OBJECT:
if (builder.canBuild()) {
postWatchEvent(builder.build());
++eventCount;
}
builder = new PathEventBuilder();
break;
// $CASES-OMITTED$
default:
break;
}
token = jsonParser.nextToken();
}
int watchmanExitCode;
LOG.debug("Posted %d Watchman events. Waiting for subprocess to exit...", eventCount);
watchmanExitCode = watchmanProcess.waitFor();
if (watchmanExitCode != 0) {
LOG.error("Watchman exited with error code %d", watchmanExitCode);
postWatchEvent(createOverflowEvent()); // Events may have been lost, signal overflow.
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
ByteStreams.copy(watchmanProcess.getErrorStream(), buffer);
throw new WatchmanWatcherException(
"Watchman failed with exit code " + watchmanExitCode + ": " + buffer.toString());
} else {
LOG.debug("Watchman exited cleanly.");
}
} catch (InterruptedException e) {
LOG.warn(e, "Killing Watchman process on interrupted exception");
postWatchEvent(createOverflowEvent()); // Events may have been lost, signal overflow.
watchmanProcess.destroy();
Thread.currentThread().interrupt();
throw e;
} catch (IOException e) {
LOG.error(e, "Killing Watchman process on I/O exception");
postWatchEvent(createOverflowEvent()); // Events may have been lost, signal overflow.
watchmanProcess.destroy();
throw e;
}
}
private void postWatchEvent(WatchEvent<?> event) {
LOG.verbose("Posting WatchEvent: %s", event);
eventBus.post(event);
}
private WatchEvent<Object> createOverflowEvent() {
return new WatchEvent<Object>() {
@Override
public Kind<Object> kind() {
return StandardWatchEventKinds.OVERFLOW;
}
@Override
public int count() {
return 1;
}
@Override
@Nullable
public Object context() {
return null;
}
@Override
public String toString() {
return "Watchman Overflow WatchEvent " + kind();
}
};
}
@Override
public void close() throws IOException {
}
private static class PathEventBuilder {
private WatchEvent.Kind<Path> kind;
@Nullable private Path path;
PathEventBuilder() {
this.kind = StandardWatchEventKinds.ENTRY_MODIFY;
}
public void setCreationEvent() {
if (kind != StandardWatchEventKinds.ENTRY_DELETE) {
kind = StandardWatchEventKinds.ENTRY_CREATE;
}
}
public void setDeletionEvent() {
kind = StandardWatchEventKinds.ENTRY_DELETE;
}
public void setPath(Path path) {
this.path = path;
}
public WatchEvent<Path> build() {
Preconditions.checkNotNull(path);
return new WatchEvent<Path>() {
@Override
public Kind<Path> kind() {
return kind;
}
@Override
public int count() {
return 1;
}
@Override
@Nullable
public Path context() {
return path;
}
@Override
public String toString() {
return "Watchman Path WatchEvent " + kind + " " + path;
}
};
}
public boolean canBuild() {
return path != null;
}
}
/**
* @return true if "watchman --version" can be executed successfully
*/
public static boolean isWatchmanAvailable() throws InterruptedException {
try {
LOG.debug("Checking if Watchman is available..");
boolean available = new ProcessBuilder("watchman", "--version").start().waitFor() == 0;
LOG.debug("Watchman available: %d", available);
return available;
} catch (IOException e) {
LOG.error(e, "Could not check if Watchman is available");
return false; // Could not execute watchman.
}
}
}