sync: fix saving of fetch times and local state

Interleaved sync didn't save _fetch_times and _local_sync_state to disk.
Phased sync saved them, but incorrectly applied moving average smoothing
repeatedly when fetching submodules, and discarded historical data
during partial syncs.

Move .Save() calls to the end of main sync loops to ensure they run
once. Update _FetchTimes.Save() to merge new data with existing history,
preventing data loss.

Change-Id: I174f98a62ac86859f1eeea1daba65eb35c227852
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/519821
Commit-Queue: Gavin Mak <gavinmak@google.com>
Reviewed-by: Scott Lee <ddoman@google.com>
Tested-by: Gavin Mak <gavinmak@google.com>
diff --git a/subcmds/sync.py b/subcmds/sync.py
index 582bd05..f950031 100644
--- a/subcmds/sync.py
+++ b/subcmds/sync.py
@@ -975,9 +975,6 @@
                 sync_event.set()
                 sync_progress_thread.join()
 
-        self._fetch_times.Save()
-        self._local_sync_state.Save()
-
         if not self.outer_client.manifest.IsArchive:
             self._GCProjects(projects, opt, err_event)
 
@@ -1003,53 +1000,58 @@
         to_fetch.extend(all_projects)
         to_fetch.sort(key=self._fetch_times.Get, reverse=True)
 
-        result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
-        success = result.success
-        fetched = result.projects
-        if not success:
-            err_event.set()
-
-        if opt.network_only:
-            # Bail out now; the rest touches the working tree.
-            if err_event.is_set():
-                e = SyncError(
-                    "error: Exited sync due to fetch errors.",
-                    aggregate_errors=errors,
-                )
-
-                logger.error(e)
-                raise e
-            return _FetchMainResult([])
-
-        # Iteratively fetch missing and/or nested unregistered submodules.
-        previously_missing_set = set()
-        while True:
-            self._ReloadManifest(None, manifest)
-            all_projects = self.GetProjects(
-                args,
-                missing_ok=True,
-                submodules_ok=opt.fetch_submodules,
-                manifest=manifest,
-                all_manifests=not opt.this_manifest_only,
-            )
-            missing = []
-            for project in all_projects:
-                if project.gitdir not in fetched:
-                    missing.append(project)
-            if not missing:
-                break
-            # Stop us from non-stopped fetching actually-missing repos: If set
-            # of missing repos has not been changed from last fetch, we break.
-            missing_set = {p.name for p in missing}
-            if previously_missing_set == missing_set:
-                break
-            previously_missing_set = missing_set
-            result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
+        try:
+            result = self._Fetch(to_fetch, opt, err_event, ssh_proxy, errors)
             success = result.success
-            new_fetched = result.projects
+            fetched = result.projects
             if not success:
                 err_event.set()
-            fetched.update(new_fetched)
+
+            if opt.network_only:
+                # Bail out now; the rest touches the working tree.
+                if err_event.is_set():
+                    e = SyncError(
+                        "error: Exited sync due to fetch errors.",
+                        aggregate_errors=errors,
+                    )
+
+                    logger.error(e)
+                    raise e
+                return _FetchMainResult([])
+
+            # Iteratively fetch missing and/or nested unregistered submodules.
+            previously_missing_set = set()
+            while True:
+                self._ReloadManifest(None, manifest)
+                all_projects = self.GetProjects(
+                    args,
+                    missing_ok=True,
+                    submodules_ok=opt.fetch_submodules,
+                    manifest=manifest,
+                    all_manifests=not opt.this_manifest_only,
+                )
+                missing = []
+                for project in all_projects:
+                    if project.gitdir not in fetched:
+                        missing.append(project)
+                if not missing:
+                    break
+                # Stop us from non-stopped fetching actually-missing repos: If
+                # set of missing repos has not been changed from last fetch, we
+                # break.
+                missing_set = {p.name for p in missing}
+                if previously_missing_set == missing_set:
+                    break
+                previously_missing_set = missing_set
+                result = self._Fetch(missing, opt, err_event, ssh_proxy, errors)
+                success = result.success
+                new_fetched = result.projects
+                if not success:
+                    err_event.set()
+                fetched.update(new_fetched)
+        finally:
+            self._fetch_times.Save()
+            self._local_sync_state.Save()
 
         return _FetchMainResult(all_projects)
 
@@ -2491,107 +2493,120 @@
         sync_event = _threading.Event()
         sync_progress_thread = self._CreateSyncProgressThread(pm, sync_event)
 
-        with multiprocessing.Manager() as manager, ssh.ProxyManager(
-            manager
-        ) as ssh_proxy:
-            ssh_proxy.sock()
-            with self.ParallelContext():
-                self.get_parallel_context()["ssh_proxy"] = ssh_proxy
-                # TODO(gavinmak): Use multprocessing.Queue instead of dict.
-                self.get_parallel_context()[
-                    "sync_dict"
-                ] = multiprocessing.Manager().dict()
-                sync_progress_thread.start()
+        try:
+            with multiprocessing.Manager() as manager, ssh.ProxyManager(
+                manager
+            ) as ssh_proxy:
+                ssh_proxy.sock()
+                with self.ParallelContext():
+                    self.get_parallel_context()["ssh_proxy"] = ssh_proxy
+                    # TODO(gavinmak): Use multprocessing.Queue instead of dict.
+                    self.get_parallel_context()[
+                        "sync_dict"
+                    ] = multiprocessing.Manager().dict()
+                    sync_progress_thread.start()
 
-                try:
-                    # Outer loop for dynamic project discovery. This continues
-                    # until no unsynced projects remain.
-                    while True:
-                        projects_to_sync = [
-                            p
-                            for p in project_list
-                            if p.relpath not in finished_relpaths
-                        ]
-                        if not projects_to_sync:
-                            break
+                    try:
+                        # Outer loop for dynamic project discovery. This
+                        # continues until no unsynced projects remain.
+                        while True:
+                            projects_to_sync = [
+                                p
+                                for p in project_list
+                                if p.relpath not in finished_relpaths
+                            ]
+                            if not projects_to_sync:
+                                break
 
-                        pending_relpaths = {p.relpath for p in projects_to_sync}
-                        if previously_pending_relpaths == pending_relpaths:
-                            stalled_projects_str = "\n".join(
-                                f" - {path}"
-                                for path in sorted(list(pending_relpaths))
-                            )
-                            logger.error(
-                                "The following projects failed and could not "
-                                "be synced:\n%s",
-                                stalled_projects_str,
-                            )
-                            err_event.set()
-                            break
-                        previously_pending_relpaths = pending_relpaths
-
-                        self.get_parallel_context()[
-                            "projects"
-                        ] = projects_to_sync
-                        project_index_map = {
-                            p: i for i, p in enumerate(projects_to_sync)
-                        }
-
-                        # Inner loop to process projects in a hierarchical
-                        # order. This iterates through levels of project
-                        # dependencies (e.g. 'foo' then 'foo/bar'). All projects
-                        # in one level can be processed in parallel, but we must
-                        # wait for a level to complete before starting the next.
-                        for level_projects in _SafeCheckoutOrder(
-                            projects_to_sync
-                        ):
-                            if not level_projects:
-                                continue
-
-                            objdir_project_map = collections.defaultdict(list)
-                            for p in level_projects:
-                                objdir_project_map[p.objdir].append(
-                                    project_index_map[p]
+                            pending_relpaths = {
+                                p.relpath for p in projects_to_sync
+                            }
+                            if previously_pending_relpaths == pending_relpaths:
+                                stalled_projects_str = "\n".join(
+                                    f" - {path}"
+                                    for path in sorted(list(pending_relpaths))
                                 )
-
-                            work_items = list(objdir_project_map.values())
-                            if not work_items:
-                                continue
-
-                            jobs = max(1, min(opt.jobs, len(work_items)))
-                            callback = functools.partial(
-                                self._ProcessSyncInterleavedResults,
-                                finished_relpaths,
-                                err_event,
-                                errors,
-                                opt,
-                            )
-                            if not self.ExecuteInParallel(
-                                jobs,
-                                functools.partial(self._SyncProjectList, opt),
-                                work_items,
-                                callback=callback,
-                                output=pm,
-                                chunksize=1,
-                                initializer=self.InitWorker,
-                            ):
+                                logger.error(
+                                    "The following projects failed and could "
+                                    "not be synced:\n%s",
+                                    stalled_projects_str,
+                                )
                                 err_event.set()
+                                break
+                            previously_pending_relpaths = pending_relpaths
 
-                            if err_event.is_set() and opt.fail_fast:
-                                raise SyncFailFastError(aggregate_errors=errors)
+                            self.get_parallel_context()[
+                                "projects"
+                            ] = projects_to_sync
+                            project_index_map = {
+                                p: i for i, p in enumerate(projects_to_sync)
+                            }
 
-                        self._ReloadManifest(None, manifest)
-                        project_list = self.GetProjects(
-                            args,
-                            missing_ok=True,
-                            submodules_ok=opt.fetch_submodules,
-                            manifest=manifest,
-                            all_manifests=not opt.this_manifest_only,
-                        )
-                        pm.update_total(len(project_list))
-                finally:
-                    sync_event.set()
-                    sync_progress_thread.join()
+                            # Inner loop to process projects in a hierarchical
+                            # order. This iterates through levels of project
+                            # dependencies (e.g. 'foo' then 'foo/bar'). All
+                            # projects in one level can be processed in
+                            # parallel, but we must wait for a level to complete
+                            # before starting the next.
+                            for level_projects in _SafeCheckoutOrder(
+                                projects_to_sync
+                            ):
+                                if not level_projects:
+                                    continue
+
+                                objdir_project_map = collections.defaultdict(
+                                    list
+                                )
+                                for p in level_projects:
+                                    objdir_project_map[p.objdir].append(
+                                        project_index_map[p]
+                                    )
+
+                                work_items = list(objdir_project_map.values())
+                                if not work_items:
+                                    continue
+
+                                jobs = max(1, min(opt.jobs, len(work_items)))
+                                callback = functools.partial(
+                                    self._ProcessSyncInterleavedResults,
+                                    finished_relpaths,
+                                    err_event,
+                                    errors,
+                                    opt,
+                                )
+                                if not self.ExecuteInParallel(
+                                    jobs,
+                                    functools.partial(
+                                        self._SyncProjectList, opt
+                                    ),
+                                    work_items,
+                                    callback=callback,
+                                    output=pm,
+                                    chunksize=1,
+                                    initializer=self.InitWorker,
+                                ):
+                                    err_event.set()
+
+                                if err_event.is_set() and opt.fail_fast:
+                                    raise SyncFailFastError(
+                                        aggregate_errors=errors
+                                    )
+
+                            self._ReloadManifest(None, manifest)
+                            project_list = self.GetProjects(
+                                args,
+                                missing_ok=True,
+                                submodules_ok=opt.fetch_submodules,
+                                manifest=manifest,
+                                all_manifests=not opt.this_manifest_only,
+                            )
+                            pm.update_total(len(project_list))
+                    finally:
+                        sync_event.set()
+                        sync_progress_thread.join()
+        finally:
+            self._fetch_times.Save()
+            self._local_sync_state.Save()
 
         pm.end()
 
@@ -2695,17 +2710,19 @@
                 self._saved = {}
 
     def Save(self):
-        if self._saved is None:
+        if not self._seen:
             return
 
+        self._Load()
+
         for name, t in self._seen.items():
             # Keep a moving average across the previous/current sync runs.
             old = self._saved.get(name, t)
-            self._seen[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
+            self._saved[name] = (self._ALPHA * t) + ((1 - self._ALPHA) * old)
 
         try:
             with open(self._path, "w") as f:
-                json.dump(self._seen, f, indent=2)
+                json.dump(self._saved, f, indent=2)
         except (OSError, TypeError):
             platform_utils.remove(self._path, missing_ok=True)
 
diff --git a/tests/test_subcmds_sync.py b/tests/test_subcmds_sync.py
index 6c9cc9a..6eb8a5a 100644
--- a/tests/test_subcmds_sync.py
+++ b/tests/test_subcmds_sync.py
@@ -681,6 +681,9 @@
         # Mock _GetCurrentBranchOnly for worker tests.
         mock.patch.object(sync.Sync, "_GetCurrentBranchOnly").start()
 
+        self.cmd._fetch_times = mock.Mock()
+        self.cmd._local_sync_state = mock.Mock()
+
     def tearDown(self):
         """Clean up resources."""
         shutil.rmtree(self.repodir)