Extract configuration properties into constants

Put all values of configuration settings names and
their default values to the top of the KafkaProperties file.

Allow the developer to have a single point of the file
where to look for configuration settings, so that can be
easier to discover.

Change-Id: I3fd0aa48603fa19e037ef82cc03729c029f46675
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
index 6a8137d..e63b31a 100644
--- a/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
+++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/config/KafkaProperties.java
@@ -32,9 +32,18 @@
 
 @Singleton
 public class KafkaProperties extends java.util.Properties {
+  private static final String PROPERTY_HTTP_WIRE_LOG = "httpWireLog";
+  private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
+  private static final String PROPERTY_REST_API_URI = "restApiUri";
+  private static final String PROPERTY_CLIENT_TYPE = "clientType";
+  private static final ClientType DEFAULT_CLIENT_TYPE = ClientType.NATIVE;
+  private static final String PROPERTY_SEND_ASYNC = "sendAsync";
+  private static final boolean DEFAULT_SEND_ASYNC = true;
+  private static final String PROPERTY_STREAM_EVENTS_TOPIC_NAME = "topic";
+  private static final String DEFAULT_STREAM_EVENTS_TOPIC_NAME = "gerrit";
+
   private static final long serialVersionUID = 0L;
 
-  private static final boolean DEFAULT_HTTP_WIRE_LOG = false;
   public static final String KAFKA_STRING_SERIALIZER = StringSerializer.class.getName();
 
   public enum ClientType {
@@ -53,13 +62,15 @@
     super();
     setDefaults();
     PluginConfig fromGerritConfig = configFactory.getFromGerritConfig(pluginName);
-    topic = fromGerritConfig.getString("topic", "gerrit");
-    sendAsync = fromGerritConfig.getBoolean("sendAsync", true);
-    clientType = fromGerritConfig.getEnum("clientType", ClientType.NATIVE);
+    topic =
+        fromGerritConfig.getString(
+            PROPERTY_STREAM_EVENTS_TOPIC_NAME, DEFAULT_STREAM_EVENTS_TOPIC_NAME);
+    sendAsync = fromGerritConfig.getBoolean(PROPERTY_SEND_ASYNC, DEFAULT_SEND_ASYNC);
+    clientType = fromGerritConfig.getEnum(PROPERTY_CLIENT_TYPE, DEFAULT_CLIENT_TYPE);
 
     switch (clientType) {
       case REST:
-        String restApiUriString = fromGerritConfig.getString("restApiUri");
+        String restApiUriString = fromGerritConfig.getString(PROPERTY_REST_API_URI);
         if (Strings.isNullOrEmpty(restApiUriString)) {
           throw new IllegalArgumentException("Missing REST API URI in Kafka properties");
         }
@@ -69,7 +80,7 @@
         } catch (URISyntaxException e) {
           throw new IllegalArgumentException("Invalid Kafka REST API URI: " + restApiUriString, e);
         }
-        httpWireLog = fromGerritConfig.getBoolean("httpWireLog", DEFAULT_HTTP_WIRE_LOG);
+        httpWireLog = fromGerritConfig.getBoolean(PROPERTY_HTTP_WIRE_LOG, DEFAULT_HTTP_WIRE_LOG);
         break;
       case NATIVE:
       default:
@@ -86,7 +97,7 @@
   public KafkaProperties(boolean sendAsync, ClientType clientType, @Nullable URI restApiURI) {
     super();
     setDefaults();
-    topic = "gerrit";
+    topic = DEFAULT_STREAM_EVENTS_TOPIC_NAME;
     this.sendAsync = sendAsync;
     this.clientType = clientType;
     this.restApiUri = restApiURI;