blob: 047a96e09a8fd15a3fc91960ecd5955cc9b6ac49 [file] [log] [blame]
// Copyright (C) 2021 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.googlesource.gerrit.plugins.kafka.rest;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.base.Function;
import com.google.common.flogger.FluentLogger;
import com.google.common.net.MediaType;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.common.Nullable;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.util.Enumeration;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.config.RequestConfig.Builder;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
public class KafkaRestClient {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private static final String KAFKA_V2_JSON = "application/vnd.kafka.json.v2+json";
private static final String KAFKA_V2 = "application/vnd.kafka.v2+json";
private final HttpHostProxy proxy;
private final CloseableHttpAsyncClient httpclient;
private final ExecutorService futureExecutor;
private final int kafkaRestApiTimeoutMsec;
private final KafkaProperties configuration;
private static boolean logConfigured;
public interface Factory {
KafkaRestClient create(KafkaProperties configuration);
}
@Inject
public KafkaRestClient(
HttpHostProxy httpHostProxy,
@FutureExecutor ExecutorService executor,
HttpAsyncClientBuilderFactory credentialsFactory,
@Assisted KafkaProperties configuration)
throws URISyntaxException {
proxy = httpHostProxy;
httpclient = proxy.apply(credentialsFactory.create()).build();
httpclient.start();
this.configuration = configuration;
kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis();
if (configuration.isHttpWireLog()) {
enableHttpWireLog();
}
this.futureExecutor = executor;
}
public static void enableHttpWireLog() {
if (!logConfigured) {
Logger httpWireLoggger = Logger.getLogger("org.apache.http.wire");
httpWireLoggger.setLevel(Level.DEBUG);
@SuppressWarnings("rawtypes")
Enumeration rootLoggerAppenders = LogManager.getRootLogger().getAllAppenders();
while (rootLoggerAppenders.hasMoreElements()) {
Appender logAppender = (Appender) rootLoggerAppenders.nextElement();
if (logAppender instanceof AppenderSkeleton) {
((AppenderSkeleton) logAppender).setThreshold(Level.DEBUG);
}
httpWireLoggger.addAppender(logAppender);
}
logConfigured = true;
}
}
public ListenableFuture<HttpResponse> execute(HttpRequestBase request, int... expectedStatuses) {
return Futures.transformAsync(
listenableFutureOf(httpclient.execute(request, null)),
(res) -> {
IOException exc =
getResponseException(
String.format("HTTP %s %s FAILED", request.getMethod(), request.getURI()),
res,
expectedStatuses);
if (exc == null) {
return Futures.immediateFuture(res);
}
return Futures.immediateFailedFuture(exc);
},
futureExecutor);
}
public <I, O> ListenableFuture<O> mapAsync(
ListenableFuture<I> inputFuture, AsyncFunction<? super I, ? extends O> mapFunction) {
return Futures.transformAsync(inputFuture, mapFunction, futureExecutor);
}
public <I, O> ListenableFuture<O> map(
ListenableFuture<I> inputFuture, Function<? super I, ? extends O> mapFunction) {
return Futures.transform(inputFuture, mapFunction, futureExecutor);
}
public HttpGet createGetTopic(String topic) {
HttpGet get = new HttpGet(resolveKafkaRestApiUri("/topics/" + topic));
get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2);
get.setConfig(createRequestConfig());
return get;
}
public HttpGet createGetRecords(URI consumerUri) {
HttpGet get = new HttpGet(consumerUri.resolve(consumerUri.getPath() + "/records"));
get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2_JSON);
get.setConfig(createRequestConfig());
return get;
}
public HttpPost createPostToConsumer(String consumerGroup) {
HttpPost post =
new HttpPost(
resolveKafkaRestApiUri("/consumers/" + URLEncoder.encode(consumerGroup, UTF_8)));
post.addHeader(HttpHeaders.ACCEPT, MediaType.ANY_TYPE.toString());
post.setConfig(createRequestConfig());
post.setEntity(
new StringEntity(
"{\"format\": \"json\",\"auto.offset.reset\": \"earliest\", \"auto.commit.enable\":\"true\", \"consumer.request.timeout.ms\": \"1000\"}",
ContentType.create(KAFKA_V2, UTF_8)));
return post;
}
public HttpDelete createDeleteToConsumer(URI consumerUri) {
HttpDelete delete = new HttpDelete(consumerUri);
delete.addHeader(HttpHeaders.ACCEPT, "*/*");
delete.setConfig(createRequestConfig());
return delete;
}
public HttpDelete createDeleteToConsumerSubscriptions(URI consumerUri) {
URI subscriptionUri = consumerUri.resolve("subscription");
HttpDelete delete = new HttpDelete(subscriptionUri);
delete.addHeader(HttpHeaders.ACCEPT, "*/*");
delete.setConfig(createRequestConfig());
return delete;
}
public HttpPost createPostToSubscribe(URI consumerUri, String topic) {
HttpPost post = new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/subscription"));
post.addHeader(HttpHeaders.ACCEPT, "*/*");
post.setConfig(createRequestConfig());
post.setEntity(
new StringEntity(
String.format("{\"topics\":[\"%s\"]}", topic), ContentType.create(KAFKA_V2, UTF_8)));
return post;
}
public HttpPost createPostToTopic(String topic, HttpEntity postBodyEntity) {
HttpPost post =
new HttpPost(resolveKafkaRestApiUri("/topics/" + URLEncoder.encode(topic, UTF_8)));
post.addHeader(HttpHeaders.ACCEPT, "*/*");
post.setConfig(createRequestConfig());
post.setEntity(postBodyEntity);
return post;
}
public HttpPost createPostSeekTopicFromBeginning(
URI consumerUri, String topic, Set<Integer> partitions) {
HttpPost post =
new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/positions/beginning"));
post.addHeader(HttpHeaders.ACCEPT, "*/*");
post.setConfig(createRequestConfig());
post.setEntity(
new StringEntity(
String.format(
"{\"partitions\":[%s]}",
partitions.stream()
.map(
partition ->
String.format("{\"topic\":\"%s\",\"partition\":%d}", topic, partition))
.collect(Collectors.joining(","))),
ContentType.create(KAFKA_V2, UTF_8)));
return post;
}
@Nullable
public IOException getResponseException(
String errorMessage, HttpResponse response, int... okHttpStatuses) {
int responseHttpStatus = response.getStatusLine().getStatusCode();
if (okHttpStatuses.length == 0) {
okHttpStatuses =
new int[] {HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT};
}
for (int httpStatus : okHttpStatuses) {
if (responseHttpStatus == httpStatus) {
return null;
}
}
String responseBody = "";
try {
responseBody = getStringEntity(response);
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"Unable to extrace the string entity for response %d (%s)",
response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase());
}
return new IOException(
String.format(
"%s\nHTTP status %d (%s)\n%s",
errorMessage,
response.getStatusLine().getStatusCode(),
response.getStatusLine().getReasonPhrase(),
responseBody));
}
protected String getStringEntity(HttpResponse response) throws IOException {
HttpEntity entity = response.getEntity();
if (entity != null) {
try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) {
entity.writeTo(outStream);
outStream.close();
return outStream.toString(UTF_8);
}
}
return "";
}
private <V> ListenableFuture<V> listenableFutureOf(Future<V> future) {
return JdkFutureAdapters.listenInPoolThread(future, futureExecutor);
}
private RequestConfig createRequestConfig() {
Builder configBuilder =
RequestConfig.custom()
.setConnectionRequestTimeout(kafkaRestApiTimeoutMsec)
.setConnectTimeout(kafkaRestApiTimeoutMsec)
.setSocketTimeout(kafkaRestApiTimeoutMsec);
configBuilder = proxy.apply(configBuilder);
RequestConfig config = configBuilder.build();
return config;
}
public void close() throws IOException {
httpclient.close();
}
public URI resolveKafkaRestApiUri(String path) {
try {
URI restApiUri = configuration.getRestApiUri();
return restApiUri.resolve(path);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
}
}
public URI resolveKafkaRestApiUri(String kafkaRestId, String path) {
URI restApiUri;
try {
restApiUri = configuration.getRestApiUri(kafkaRestId);
return restApiUri.resolve(restApiUri.getPath() + path);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Invalid Kafka REST API URI", e);
}
}
}