| // Copyright (C) 2021 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package com.googlesource.gerrit.plugins.kafka.publish; |
| |
| import com.google.common.flogger.FluentLogger; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.inject.Inject; |
| import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties; |
| import com.googlesource.gerrit.plugins.kafka.rest.KafkaRestClient; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.http.HttpStatus; |
| import org.apache.http.client.methods.HttpPost; |
| import org.apache.http.entity.ContentType; |
| import org.apache.http.entity.StringEntity; |
| import org.apache.kafka.clients.consumer.OffsetAndMetadata; |
| import org.apache.kafka.clients.producer.Callback; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.clients.producer.RecordMetadata; |
| import org.apache.kafka.common.Metric; |
| import org.apache.kafka.common.MetricName; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.TopicPartition; |
| import org.apache.kafka.common.errors.ProducerFencedException; |
| |
| public class KafkaRestProducer implements Producer<String, String> { |
| private static final RecordMetadata ZEROS_RECORD_METADATA = |
| new RecordMetadata(null, 0, 0, 0, null, 0, 0); |
| private static final FluentLogger logger = FluentLogger.forEnclosingClass(); |
| private static final String KAFKA_V2_JSON = "application/vnd.kafka.json.v2+json"; |
| private final KafkaRestClient restClient; |
| |
| @Inject |
| public KafkaRestProducer(KafkaProperties kafkaConf, KafkaRestClient.Factory restClientFactory) { |
| restClient = restClientFactory.create(kafkaConf); |
| } |
| |
| @Override |
| public void initTransactions() { |
| unsupported(); |
| } |
| |
| @Override |
| public void beginTransaction() throws ProducerFencedException { |
| unsupported(); |
| } |
| |
| @Override |
| public void sendOffsetsToTransaction( |
| Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) |
| throws ProducerFencedException { |
| unsupported(); |
| } |
| |
| @Override |
| public void commitTransaction() throws ProducerFencedException { |
| unsupported(); |
| } |
| |
| @Override |
| public void abortTransaction() throws ProducerFencedException { |
| unsupported(); |
| } |
| |
| @Override |
| public Future<RecordMetadata> send(ProducerRecord<String, String> record) { |
| return send(record, null); |
| } |
| |
| @Override |
| public Future<RecordMetadata> send(ProducerRecord<String, String> record, Callback callback) { |
| HttpPost post = |
| restClient.createPostToTopic( |
| record.topic(), |
| new StringEntity( |
| getRecordAsJson(record), |
| ContentType.create(KAFKA_V2_JSON, StandardCharsets.UTF_8))); |
| return restClient.mapAsync( |
| restClient.execute(post, HttpStatus.SC_OK), |
| (res) -> Futures.immediateFuture(ZEROS_RECORD_METADATA)); |
| } |
| |
| @Override |
| public void flush() { |
| unsupported(); |
| } |
| |
| @Override |
| public List<PartitionInfo> partitionsFor(String topic) { |
| return unsupported(); |
| } |
| |
| @Override |
| public Map<MetricName, ? extends Metric> metrics() { |
| return unsupported(); |
| } |
| |
| @Override |
| public void close() { |
| try { |
| restClient.close(); |
| } catch (IOException e) { |
| logger.atWarning().withCause(e).log("Unable to close httpclient"); |
| } |
| } |
| |
| @Override |
| public void close(long timeout, TimeUnit unit) { |
| close(); |
| } |
| |
| private String getRecordAsJson(ProducerRecord<String, String> record) { |
| return String.format( |
| "{\"records\":[{\"key\":\"%s\",\"value\":%s}]}", record.key(), record.value()); |
| } |
| |
| private <T> T unsupported() { |
| throw new IllegalArgumentException("Unsupported method"); |
| } |
| } |