| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.kafka.rest; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| |
| import com.google.common.base.Function; |
| import com.google.common.flogger.FluentLogger; |
| import com.google.common.net.MediaType; |
| import com.google.common.util.concurrent.AsyncFunction; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.JdkFutureAdapters; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.gerrit.common.Nullable; |
| import com.google.inject.Inject; |
| import com.google.inject.assistedinject.Assisted; |
| import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.net.URLEncoder; |
| import java.util.Enumeration; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.stream.Collectors; |
| import org.apache.http.HttpEntity; |
| import org.apache.http.HttpHeaders; |
| import org.apache.http.HttpResponse; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.client.config.RequestConfig; |
| import org.apache.http.client.config.RequestConfig.Builder; |
| import org.apache.http.client.methods.HttpDelete; |
| import org.apache.http.client.methods.HttpGet; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.client.methods.HttpRequestBase; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.entity.StringEntity; |
| import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; |
| import org.apache.log4j.Appender; |
| import org.apache.log4j.AppenderSkeleton; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| |
| public class KafkaRestClient { |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| private static final String KAFKA_V2_JSON = "application/vnd.kafka.json.v2+json"; |
| private static final String KAFKA_V2 = "application/vnd.kafka.v2+json"; |
| |
| private final HttpHostProxy proxy; |
| private final CloseableHttpAsyncClient httpclient; |
| private final ExecutorService futureExecutor; |
| private final int kafkaRestApiTimeoutMsec; |
| private final KafkaProperties configuration; |
| |
| private static boolean logConfigured; |
| |
| public interface Factory { |
| KafkaRestClient create(KafkaProperties configuration); |
| } |
| |
| @Inject |
| public KafkaRestClient( |
| HttpHostProxy httpHostProxy, |
| @FutureExecutor ExecutorService executor, |
| HttpAsyncClientBuilderFactory credentialsFactory, |
| @Assisted KafkaProperties configuration) |
| throws URISyntaxException { |
| proxy = httpHostProxy; |
| httpclient = proxy.apply(credentialsFactory.create()).build(); |
| httpclient.start(); |
| this.configuration = configuration; |
| kafkaRestApiTimeoutMsec = (int) configuration.getRestApiTimeout().toMillis(); |
| if (configuration.isHttpWireLog()) { |
| enableHttpWireLog(); |
| } |
| this.futureExecutor = executor; |
| } |
| |
| public static void enableHttpWireLog() { |
| if (!logConfigured) { |
| Logger httpWireLoggger = Logger.getLogger("org.apache.http.wire"); |
| httpWireLoggger.setLevel(Level.DEBUG); |
| |
| @SuppressWarnings("rawtypes") |
| Enumeration rootLoggerAppenders = LogManager.getRootLogger().getAllAppenders(); |
| while (rootLoggerAppenders.hasMoreElements()) { |
| Appender logAppender = (Appender) rootLoggerAppenders.nextElement(); |
| if (logAppender instanceof AppenderSkeleton) { |
| ((AppenderSkeleton) logAppender).setThreshold(Level.DEBUG); |
| } |
| httpWireLoggger.addAppender(logAppender); |
| } |
| |
| logConfigured = true; |
| } |
| } |
| |
| public ListenableFuture<HttpResponse> execute(HttpRequestBase request, int... expectedStatuses) { |
| return Futures.transformAsync( |
| listenableFutureOf(httpclient.execute(request, null)), |
| (res) -> { |
| IOException exc = |
| getResponseException( |
| String.format("HTTP %s %s FAILED", request.getMethod(), request.getURI()), |
| res, |
| expectedStatuses); |
| if (exc == null) { |
| return Futures.immediateFuture(res); |
| } |
| return Futures.immediateFailedFuture(exc); |
| }, |
| futureExecutor); |
| } |
| |
| public <I, O> ListenableFuture<O> mapAsync( |
| ListenableFuture<I> inputFuture, AsyncFunction<? super I, ? extends O> mapFunction) { |
| return Futures.transformAsync(inputFuture, mapFunction, futureExecutor); |
| } |
| |
| public <I, O> ListenableFuture<O> map( |
| ListenableFuture<I> inputFuture, Function<? super I, ? extends O> mapFunction) { |
| return Futures.transform(inputFuture, mapFunction, futureExecutor); |
| } |
| |
| public HttpGet createGetTopic(String topic) { |
| HttpGet get = new HttpGet(resolveKafkaRestApiUri("/topics/" + topic)); |
| get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2); |
| get.setConfig(createRequestConfig()); |
| return get; |
| } |
| |
| public HttpGet createGetRecords(URI consumerUri) { |
| HttpGet get = new HttpGet(consumerUri.resolve(consumerUri.getPath() + "/records")); |
| get.addHeader(HttpHeaders.ACCEPT, KAFKA_V2_JSON); |
| get.setConfig(createRequestConfig()); |
| return get; |
| } |
| |
| public HttpPost createPostToConsumer(String consumerGroup) { |
| HttpPost post = |
| new HttpPost( |
| resolveKafkaRestApiUri("/consumers/" + URLEncoder.encode(consumerGroup, UTF_8))); |
| post.addHeader(HttpHeaders.ACCEPT, MediaType.ANY_TYPE.toString()); |
| post.setConfig(createRequestConfig()); |
| post.setEntity( |
| new StringEntity( |
| "{\"format\": \"json\",\"auto.offset.reset\": \"earliest\", \"auto.commit.enable\":\"true\", \"consumer.request.timeout.ms\": \"1000\"}", |
| ContentType.create(KAFKA_V2, UTF_8))); |
| return post; |
| } |
| |
| public HttpDelete createDeleteToConsumer(URI consumerUri) { |
| HttpDelete delete = new HttpDelete(consumerUri); |
| delete.addHeader(HttpHeaders.ACCEPT, "*/*"); |
| delete.setConfig(createRequestConfig()); |
| return delete; |
| } |
| |
| public HttpDelete createDeleteToConsumerSubscriptions(URI consumerUri) { |
| URI subscriptionUri = consumerUri.resolve("subscription"); |
| HttpDelete delete = new HttpDelete(subscriptionUri); |
| delete.addHeader(HttpHeaders.ACCEPT, "*/*"); |
| delete.setConfig(createRequestConfig()); |
| return delete; |
| } |
| |
| public HttpPost createPostToSubscribe(URI consumerUri, String topic) { |
| HttpPost post = new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/subscription")); |
| post.addHeader(HttpHeaders.ACCEPT, "*/*"); |
| post.setConfig(createRequestConfig()); |
| post.setEntity( |
| new StringEntity( |
| String.format("{\"topics\":[\"%s\"]}", topic), ContentType.create(KAFKA_V2, UTF_8))); |
| return post; |
| } |
| |
| public HttpPost createPostToTopic(String topic, HttpEntity postBodyEntity) { |
| HttpPost post = |
| new HttpPost(resolveKafkaRestApiUri("/topics/" + URLEncoder.encode(topic, UTF_8))); |
| post.addHeader(HttpHeaders.ACCEPT, "*/*"); |
| post.setConfig(createRequestConfig()); |
| post.setEntity(postBodyEntity); |
| return post; |
| } |
| |
| public HttpPost createPostSeekTopicFromBeginning( |
| URI consumerUri, String topic, Set<Integer> partitions) { |
| HttpPost post = |
| new HttpPost(consumerUri.resolve(consumerUri.getPath() + "/positions/beginning")); |
| post.addHeader(HttpHeaders.ACCEPT, "*/*"); |
| post.setConfig(createRequestConfig()); |
| post.setEntity( |
| new StringEntity( |
| String.format( |
| "{\"partitions\":[%s]}", |
| partitions.stream() |
| .map( |
| partition -> |
| String.format("{\"topic\":\"%s\",\"partition\":%d}", topic, partition)) |
| .collect(Collectors.joining(","))), |
| ContentType.create(KAFKA_V2, UTF_8))); |
| return post; |
| } |
| |
| @Nullable |
| public IOException getResponseException( |
| String errorMessage, HttpResponse response, int... okHttpStatuses) { |
| int responseHttpStatus = response.getStatusLine().getStatusCode(); |
| if (okHttpStatuses.length == 0) { |
| okHttpStatuses = |
| new int[] {HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_NO_CONTENT}; |
| } |
| for (int httpStatus : okHttpStatuses) { |
| if (responseHttpStatus == httpStatus) { |
| return null; |
| } |
| } |
| |
| String responseBody = ""; |
| try { |
| responseBody = getStringEntity(response); |
| } catch (IOException e) { |
| logger.atWarning().withCause(e).log( |
| "Unable to extrace the string entity for response %d (%s)", |
| response.getStatusLine().getStatusCode(), response.getStatusLine().getReasonPhrase()); |
| } |
| |
| return new IOException( |
| String.format( |
| "%s\nHTTP status %d (%s)\n%s", |
| errorMessage, |
| response.getStatusLine().getStatusCode(), |
| response.getStatusLine().getReasonPhrase(), |
| responseBody)); |
| } |
| |
| protected String getStringEntity(HttpResponse response) throws IOException { |
| HttpEntity entity = response.getEntity(); |
| if (entity != null) { |
| try (ByteArrayOutputStream outStream = new ByteArrayOutputStream()) { |
| entity.writeTo(outStream); |
| outStream.close(); |
| return outStream.toString(UTF_8); |
| } |
| } |
| return ""; |
| } |
| |
| private <V> ListenableFuture<V> listenableFutureOf(Future<V> future) { |
| return JdkFutureAdapters.listenInPoolThread(future, futureExecutor); |
| } |
| |
| private RequestConfig createRequestConfig() { |
| Builder configBuilder = |
| RequestConfig.custom() |
| .setConnectionRequestTimeout(kafkaRestApiTimeoutMsec) |
| .setConnectTimeout(kafkaRestApiTimeoutMsec) |
| .setSocketTimeout(kafkaRestApiTimeoutMsec); |
| configBuilder = proxy.apply(configBuilder); |
| RequestConfig config = configBuilder.build(); |
| return config; |
| } |
| |
| public void close() throws IOException { |
| httpclient.close(); |
| } |
| |
| public URI resolveKafkaRestApiUri(String path) { |
| try { |
| URI restApiUri = configuration.getRestApiUri(); |
| return restApiUri.resolve(path); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Invalid Kafka REST API URI", e); |
| } |
| } |
| |
| public URI resolveKafkaRestApiUri(String kafkaRestId, String path) { |
| URI restApiUri; |
| try { |
| restApiUri = configuration.getRestApiUri(kafkaRestId); |
| return restApiUri.resolve(restApiUri.getPath() + path); |
| } catch (URISyntaxException e) { |
| throw new IllegalArgumentException("Invalid Kafka REST API URI", e); |
| } |
| } |
| } |