Add PatchLineComments to RebuildNotedb

Change-Id: I3074c91afe0f084cb51087d9ce615d984794f4eb
diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
index f497444..21b7132 100644
--- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
+++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java
@@ -32,7 +32,9 @@
 import com.google.gerrit.pgm.util.ThreadLimiter;
 import com.google.gerrit.reviewdb.client.Change;
 import com.google.gerrit.reviewdb.client.Project;
+import com.google.gerrit.reviewdb.client.RefNames;
 import com.google.gerrit.reviewdb.server.ReviewDb;
+import com.google.gerrit.server.config.AllUsersName;
 import com.google.gerrit.server.git.GitRepositoryManager;
 import com.google.gerrit.server.git.MultiProgressMonitor;
 import com.google.gerrit.server.git.MultiProgressMonitor.Task;
@@ -48,13 +50,20 @@
 import com.google.inject.TypeLiteral;
 
 import org.eclipse.jgit.lib.BatchRefUpdate;
+import org.eclipse.jgit.lib.NullProgressMonitor;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Ref;
+import org.eclipse.jgit.lib.RefDatabase;
 import org.eclipse.jgit.lib.Repository;
 import org.eclipse.jgit.revwalk.RevWalk;
+import org.eclipse.jgit.transport.ReceiveCommand;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -93,100 +102,98 @@
     Stopwatch sw = Stopwatch.createStarted();
     GitRepositoryManager repoManager =
         sysInjector.getInstance(GitRepositoryManager.class);
+    final Project.NameKey allUsersName =
+        sysInjector.getInstance(AllUsersName.class);
+    final Repository allUsersRepo = repoManager.openRepository(allUsersName);
+    try {
+      deleteDraftRefs(allUsersRepo);
+      for (final Project.NameKey project : changesByProject.keySet()) {
+        final Repository repo = repoManager.openRepository(project);
+        try {
+          final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
+          final BatchRefUpdate bruForDrafts =
+              allUsersRepo.getRefDatabase().newBatchUpdate();
+          List<ListenableFuture<?>> futures = Lists.newArrayList();
 
-    for (final Project.NameKey project : changesByProject.keySet()) {
-      final Repository repo = repoManager.openRepository(project);
-      try {
-        final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
-        List<ListenableFuture<?>> futures = Lists.newArrayList();
+          // Here, we truncate the project name to 50 characters to ensure that
+          // the whole monitor line for a project fits on one line (<80 chars).
+          final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
+              truncateProjectName(project.get()));
+          final Task doneTask =
+              mpm.beginSubTask("done", changesByProject.get(project).size());
+          final Task failedTask =
+              mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
 
-        // Here, we truncate the project name to 50 characters to ensure that
-        // the whole monitor line for a project fits on one line (<80 chars).
-        int monitorStringMaxLength = 50;
-        String projectString = project.toString();
-        String monitorString = (projectString.length() > monitorStringMaxLength)
-            ? projectString.substring(0, monitorStringMaxLength)
-            : projectString;
-        if (projectString.length() > monitorString.length()) {
-          monitorString = monitorString + "...";
-        }
-        final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
-            monitorString);
-        final Task doneTask =
-            mpm.beginSubTask("done", changesByProject.get(project).size());
-        final Task failedTask = mpm.beginSubTask("failed",
-            MultiProgressMonitor.UNKNOWN);
+          for (final Change c : changesByProject.get(project)) {
+            final ListenableFuture<?> future = rebuilder.rebuildAsync(c,
+                executor, bru, bruForDrafts, repo, allUsersRepo);
+            futures.add(future);
+            future.addListener(
+                new RebuildListener(c.getId(), future, ok, doneTask, failedTask),
+                MoreExecutors.sameThreadExecutor());
+          }
 
-        for (final Change c : changesByProject.get(project)) {
-          final ListenableFuture<?> future =
-              rebuilder.rebuildAsync(c, executor, bru);
-          futures.add(future);
-          future.addListener(new Runnable() {
-            @Override
-            public void run() {
-              try {
-                future.get();
-                doneTask.update(1);
-              } catch (ExecutionException | InterruptedException e) {
-                fail(e);
-              } catch (RuntimeException e) {
-                failAndThrow(e);
-              } catch (Error e) {
-                // Can't join with RuntimeException because "RuntimeException |
-                // Error" becomes Throwable, which messes with signatures.
-                failAndThrow(e);
-              }
-            }
-
-            private void fail(Throwable t) {
-              log.error("Failed to rebuild change " + c.getId(), t);
-              ok.set(false);
-              failedTask.update(1);
-            }
-
-            private void failAndThrow(RuntimeException e) {
-              fail(e);
-              throw e;
-            }
-
-            private void failAndThrow(Error e) {
-              fail(e);
-              throw e;
-            }
-          }, MoreExecutors.sameThreadExecutor());
-        }
-
-        mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
-            new AsyncFunction<List<?>, Void>() {
-                @Override
-              public ListenableFuture<Void> apply(List<?> input)
-                  throws Exception {
-                Task t = mpm.beginSubTask("update refs",
-                    MultiProgressMonitor.UNKNOWN);
-                RevWalk walk = new RevWalk(repo);
-                try {
-                  bru.execute(walk, t);
+          mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
+              new AsyncFunction<List<?>, Void>() {
+                  @Override
+                public ListenableFuture<Void> apply(List<?> input)
+                    throws Exception {
+                  execute(bru, repo);
+                  execute(bruForDrafts, allUsersRepo);
                   mpm.end();
                   return Futures.immediateFuture(null);
-                } finally {
-                  walk.release();
                 }
-              }
-            }));
-      } catch (Exception e) {
-        log.error("Error rebuilding notedb", e);
-        ok.set(false);
-        break;
-      } finally {
-        repo.close();
+              }));
+        } catch (Exception e) {
+          log.error("Error rebuilding notedb", e);
+          ok.set(false);
+          break;
+        } finally {
+          repo.close();
+        }
       }
+    } finally {
+      allUsersRepo.close();
     }
+
     double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
     System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
         changesByProject.size(), t, changesByProject.size() / t);
     return ok.get() ? 0 : 1;
   }
 
+  private static String truncateProjectName(String projectName) {
+    int monitorStringMaxLength = 50;
+    String monitorString = (projectName.length() > monitorStringMaxLength)
+        ? projectName.substring(0, monitorStringMaxLength)
+        : projectName;
+    if (projectName.length() > monitorString.length()) {
+      monitorString = monitorString + "...";
+    }
+    return monitorString;
+  }
+
+  private static void execute(BatchRefUpdate bru, Repository repo)
+      throws IOException {
+    RevWalk rw = new RevWalk(repo);
+    try {
+      bru.execute(rw, NullProgressMonitor.INSTANCE);
+    } finally {
+      rw.release();
+    }
+  }
+
+  private void deleteDraftRefs(Repository allUsersRepo) throws IOException {
+    RefDatabase refDb = allUsersRepo.getRefDatabase();
+    Map<String, Ref> allRefs = refDb.getRefs(RefNames.REFS_DRAFT_COMMENTS);
+    BatchRefUpdate bru = refDb.newBatchUpdate();
+    for (Map.Entry<String, Ref> ref : allRefs.entrySet()) {
+      bru.addCommand(new ReceiveCommand(ref.getValue().getObjectId(),
+          ObjectId.zeroId(), RefNames.REFS_DRAFT_COMMENTS + ref.getKey()));
+    }
+    execute(bru, allUsersRepo);
+  }
+
   private Injector createSysInjector() {
     return dbInjector.createChildInjector(new AbstractModule() {
       @Override
@@ -227,4 +234,54 @@
       db.close();
     }
   }
+
+  private class RebuildListener implements Runnable {
+    private Change.Id changeId;
+    private ListenableFuture<?> future;
+    private AtomicBoolean ok;
+    private Task doneTask;
+    private Task failedTask;
+
+
+    private RebuildListener(Change.Id changeId, ListenableFuture<?> future,
+        AtomicBoolean ok, Task doneTask, Task failedTask) {
+      this.changeId = changeId;
+      this.future = future;
+      this.ok = ok;
+      this.doneTask = doneTask;
+      this.failedTask = failedTask;
+    }
+
+    @Override
+    public void run() {
+      try {
+        future.get();
+        doneTask.update(1);
+      } catch (ExecutionException | InterruptedException e) {
+        fail(e);
+      } catch (RuntimeException e) {
+        failAndThrow(e);
+      } catch (Error e) {
+        // Can't join with RuntimeException because "RuntimeException
+        // | Error" becomes Throwable, which messes with signatures.
+        failAndThrow(e);
+      }
+    }
+
+    private void fail(Throwable t) {
+      log.error("Failed to rebuild change " + changeId, t);
+      ok.set(false);
+      failedTask.update(1);
+    }
+
+    private void failAndThrow(RuntimeException e) {
+      fail(e);
+      throw e;
+    }
+
+    private void failAndThrow(Error e) {
+      fail(e);
+      throw e;
+    }
+  }
 }
diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
index 910edf9..7485279 100644
--- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
+++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java
@@ -15,21 +15,27 @@
 package com.google.gerrit.server.notedb;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.gerrit.server.notedb.CommentsInNotesUtil.getCommentPsId;
+import static com.google.gerrit.server.PatchLineCommentsUtil.setCommentRevId;
 
 import com.google.common.base.Objects;
+import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.gerrit.reviewdb.client.Account;
 import com.google.gerrit.reviewdb.client.Change;
+import com.google.gerrit.reviewdb.client.PatchLineComment;
+import com.google.gerrit.reviewdb.client.PatchLineComment.Status;
 import com.google.gerrit.reviewdb.client.PatchSet;
 import com.google.gerrit.reviewdb.client.PatchSetApproval;
 import com.google.gerrit.reviewdb.server.ReviewDb;
-import com.google.gerrit.server.GerritPersonIdent;
 import com.google.gerrit.server.IdentifiedUser;
-import com.google.gerrit.server.git.GitRepositoryManager;
+import com.google.gerrit.server.PatchLineCommentsUtil;
 import com.google.gerrit.server.git.VersionedMetaData.BatchMetaDataUpdate;
+import com.google.gerrit.server.patch.PatchListCache;
 import com.google.gerrit.server.project.ChangeControl;
 import com.google.gerrit.server.project.NoSuchChangeException;
 import com.google.gwtorm.server.OrmException;
@@ -37,8 +43,9 @@
 import com.google.inject.Provider;
 
 import org.eclipse.jgit.lib.BatchRefUpdate;
-import org.eclipse.jgit.lib.CommitBuilder;
-import org.eclipse.jgit.lib.PersonIdent;
+import org.eclipse.jgit.lib.ObjectInserter;
+import org.eclipse.jgit.lib.RefUpdate;
+import org.eclipse.jgit.lib.Repository;
 
 import java.io.IOException;
 import java.sql.Timestamp;
@@ -52,57 +59,82 @@
   private static final long TS_WINDOW_MS =
       TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
 
-  private final PersonIdent serverIdent;
   private final Provider<ReviewDb> dbProvider;
   private final ChangeControl.GenericFactory controlFactory;
   private final IdentifiedUser.GenericFactory userFactory;
+  private final PatchListCache patchListCache;
   private final ChangeUpdate.Factory updateFactory;
+  private final ChangeDraftUpdate.Factory draftUpdateFactory;
 
   @Inject
-  ChangeRebuilder(@GerritPersonIdent PersonIdent serverIdent,
-      Provider<ReviewDb> dbProvider,
-      GitRepositoryManager repoManager,
+  ChangeRebuilder(Provider<ReviewDb> dbProvider,
       ChangeControl.GenericFactory controlFactory,
       IdentifiedUser.GenericFactory userFactory,
-      ChangeUpdate.Factory updateFactory) {
-    this.serverIdent = serverIdent;
+      PatchListCache patchListCache,
+      PatchLineCommentsUtil plcUtil,
+      ChangeUpdate.Factory updateFactory,
+      ChangeDraftUpdate.Factory draftUpdateFactory) {
     this.dbProvider = dbProvider;
     this.controlFactory = controlFactory;
     this.userFactory = userFactory;
+    this.patchListCache = patchListCache;
     this.updateFactory = updateFactory;
+    this.draftUpdateFactory = draftUpdateFactory;
   }
 
   public ListenableFuture<?> rebuildAsync(final Change change,
-      ListeningExecutorService executor, final BatchRefUpdate bru) {
+      ListeningExecutorService executor, final BatchRefUpdate bru,
+      final BatchRefUpdate bruForDrafts, final Repository changeRepo,
+      final Repository allUsersRepo) {
     return executor.submit(new Callable<Void>() {
         @Override
       public Void call() throws Exception {
-        rebuild(change, bru);
+        rebuild(change, bru, bruForDrafts, changeRepo, allUsersRepo);
         return null;
       }
     });
   }
 
-  public void rebuild(Change change, BatchRefUpdate bru)
-      throws NoSuchChangeException, IOException, OrmException {
+  public void rebuild(Change change, BatchRefUpdate bru,
+      BatchRefUpdate bruForDrafts, Repository changeRepo,
+      Repository allUsersRepo) throws NoSuchChangeException, IOException,
+      OrmException {
+    deleteRef(change, changeRepo);
     ReviewDb db = dbProvider.get();
     Change.Id changeId = change.getId();
 
+    // We will rebuild all events, except for draft comments, in buckets based
+    // on author and timestamp. However, all draft comments for a given change
+    // and author will be written as one commit in the notedb.
     List<Event> events = Lists.newArrayList();
+    Multimap<Account.Id, PatchLineCommentEvent> draftCommentEvents =
+        ArrayListMultimap.create();
+
     for (PatchSet ps : db.patchSets().byChange(changeId)) {
       events.add(new PatchSetEvent(ps));
+      for (PatchLineComment c : db.patchComments().byPatchSet(ps.getId())) {
+        PatchLineCommentEvent e =
+            new PatchLineCommentEvent(c, change, ps, patchListCache);
+        if (c.getStatus() == Status.PUBLISHED) {
+          events.add(e);
+        } else {
+          draftCommentEvents.put(c.getAuthor(), e);
+        }
+      }
     }
+
     for (PatchSetApproval psa : db.patchSetApprovals().byChange(changeId)) {
       events.add(new ApprovalEvent(psa));
     }
-    Collections.sort(events);
 
+
+    Collections.sort(events);
     BatchMetaDataUpdate batch = null;
     ChangeUpdate update = null;
     for (Event e : events) {
       if (!sameUpdate(e, update)) {
         if (update != null) {
-          writeToBatch(batch, update);
+          writeToBatch(batch, update, changeRepo);
         }
         IdentifiedUser user = userFactory.create(dbProvider, e.who);
         update = updateFactory.create(
@@ -116,7 +148,7 @@
     }
     if (batch != null) {
       if (update != null) {
-        writeToBatch(batch, update);
+        writeToBatch(batch, update, changeRepo);
       }
 
       // Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a
@@ -128,13 +160,54 @@
         batch.commit();
       }
     }
+
+    for (Account.Id author : draftCommentEvents.keys()) {
+      IdentifiedUser user = userFactory.create(dbProvider, author);
+      ChangeDraftUpdate draftUpdate = null;
+      BatchMetaDataUpdate batchForDrafts = null;
+      for (PatchLineCommentEvent e : draftCommentEvents.get(author)) {
+        if (draftUpdate == null) {
+          draftUpdate = draftUpdateFactory.create(
+              controlFactory.controlFor(change, user), e.when);
+          draftUpdate.setPatchSetId(e.psId);
+          batchForDrafts = draftUpdate.openUpdateInBatch(bruForDrafts);
+        }
+        e.applyDraft(draftUpdate);
+      }
+      writeToBatch(batchForDrafts, draftUpdate, allUsersRepo);
+      synchronized(bruForDrafts) {
+        batchForDrafts.commit();
+      }
+    }
   }
 
-  private void writeToBatch(BatchMetaDataUpdate batch, ChangeUpdate update)
+  private void deleteRef(Change change, Repository changeRepo)
       throws IOException {
-    CommitBuilder commit = new CommitBuilder();
-    commit.setCommitter(new PersonIdent(serverIdent, update.getWhen()));
-    batch.write(update, commit);
+    String refName = ChangeNoteUtil.changeRefName(change.getId());
+    RefUpdate ru = changeRepo.updateRef(refName, true);
+    ru.setForceUpdate(true);
+    RefUpdate.Result result = ru.delete();
+    switch (result) {
+      case FORCED:
+      case NEW:
+      case NO_CHANGE:
+        break;
+      default:
+        throw new IOException(
+            String.format("Failed to delete ref %s: %s", refName, result));
+    }
+  }
+
+  private void writeToBatch(BatchMetaDataUpdate batch,
+      AbstractChangeUpdate update, Repository repo) throws IOException,
+      OrmException {
+    ObjectInserter inserter = repo.newObjectInserter();
+    try {
+      update.setInserter(inserter);
+      update.writeCommit(batch);
+    } finally {
+      inserter.release();
+    }
   }
 
   private static long round(Date when) {
@@ -159,7 +232,7 @@
       this.when = when;
     }
 
-    protected void checkUpdate(ChangeUpdate update) {
+    protected void checkUpdate(AbstractChangeUpdate update) {
       checkState(Objects.equal(update.getPatchSetId(), psId),
           "cannot apply event for %s to update for %s",
           update.getPatchSetId(), psId);
@@ -171,7 +244,7 @@
           who, update.getUser().getAccountId());
     }
 
-    abstract void apply(ChangeUpdate update);
+    abstract void apply(ChangeUpdate update) throws OrmException;
 
     @Override
     public int compareTo(Event other) {
@@ -228,4 +301,36 @@
       }
     }
   }
+
+  private static class PatchLineCommentEvent extends Event {
+    public final PatchLineComment c;
+    private final Change change;
+    private final PatchSet ps;
+    private final PatchListCache cache;
+
+    PatchLineCommentEvent(PatchLineComment c, Change change, PatchSet ps,
+        PatchListCache cache) {
+      super(getCommentPsId(c), c.getAuthor(), c.getWrittenOn());
+      this.c = c;
+      this.change = change;
+      this.ps = ps;
+      this.cache = cache;
+    }
+
+    @Override
+    void apply(ChangeUpdate update) throws OrmException {
+      checkUpdate(update);
+      if (c.getRevId() == null) {
+        setCommentRevId(c, cache, change, ps);
+      }
+      update.insertComment(c);
+    }
+
+    void applyDraft(ChangeDraftUpdate draftUpdate) throws OrmException {
+      if (c.getRevId() == null) {
+        setCommentRevId(c, cache, change, ps);
+      }
+      draftUpdate.insertComment(c);
+    }
+  }
 }