Make gr-rest-api-helper use schedulers for scheduling requests.

Google-Bug-Id: b/73958279
Change-Id: I6be8a3607893b6257de624c70529f9b1205b0275
diff --git a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-api-impl.ts b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-api-impl.ts
index 5c4f090..ae6268d 100644
--- a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-api-impl.ts
+++ b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-api-impl.ts
@@ -156,6 +156,9 @@
 import {ParsedChangeInfo} from '../../../types/types';
 import {ErrorCallback} from '../../../api/rest';
 import {addDraftProp, DraftInfo} from '../../../utils/comment-util';
+import {BaseScheduler} from '../../../services/scheduler/scheduler';
+import {RetryScheduler} from '../../../services/scheduler/retry-scheduler';
+import {MaxInFlightScheduler} from '../../../services/scheduler/max-in-flight-scheduler';
 
 const MAX_PROJECT_RESULTS = 25;
 
@@ -279,6 +282,22 @@
   }
 }
 
+function createReadScheduler() {
+  return new RetryScheduler<Response>(
+    new MaxInFlightScheduler<Response>(new BaseScheduler<Response>(), 10),
+    3,
+    50
+  );
+}
+
+function createWriteScheduler() {
+  return new RetryScheduler<Response>(
+    new MaxInFlightScheduler<Response>(new BaseScheduler<Response>(), 5),
+    3,
+    50
+  );
+}
+
 @customElement('gr-rest-api-service-impl')
 export class GrRestApiServiceImpl
   extends PolymerElement
@@ -308,7 +327,9 @@
     this._restApiHelper = new GrRestApiHelper(
       this._cache,
       this.authService,
-      this._sharedFetchPromises
+      this._sharedFetchPromises,
+      createReadScheduler(),
+      createWriteScheduler()
     );
   }
 
diff --git a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper.ts b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper.ts
index 8db6606..332e0c5 100644
--- a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper.ts
+++ b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper.ts
@@ -31,6 +31,8 @@
 import {fireNetworkError, fireServerError} from '../../../../utils/event-util';
 import {FetchRequest} from '../../../../types/types';
 import {ErrorCallback} from '../../../../api/rest';
+import {Scheduler, Task} from '../../../../services/scheduler/scheduler';
+import {RetryError} from '../../../../services/scheduler/retry-scheduler';
 
 export const JSON_PREFIX = ")]}'";
 
@@ -236,20 +238,45 @@
   constructor(
     private readonly _cache: SiteBasedCache,
     private readonly _auth: AuthService,
-    private readonly _fetchPromisesCache: FetchPromisesCache
+    private readonly _fetchPromisesCache: FetchPromisesCache,
+    private readonly readScheduler: Scheduler<Response>,
+    private readonly writeScheduler: Scheduler<Response>
   ) {}
 
+  private schedule(method: string, task: Task<Response>) {
+    if (method === 'PUT' || method === 'POST' || method === 'DELETE') {
+      return this.writeScheduler.schedule(task);
+    } else {
+      return this.readScheduler.schedule(task);
+    }
+  }
+
   /**
    * Wraps calls to the underlying authenticated fetch function (_auth.fetch)
    * with timing and logging.
 s   */
   fetch(req: FetchRequest): Promise<Response> {
+    const method =
+      req.fetchOptions && req.fetchOptions.method
+        ? req.fetchOptions.method
+        : 'GET';
     const start = Date.now();
-    const xhr = this._auth.fetch(req.url, req.fetchOptions);
+    const task = async () => {
+      const res = await this._auth.fetch(req.url, req.fetchOptions);
+      if (!res.ok && res.status === 429) throw new RetryError<Response>(res);
+      return res;
+    };
+
+    const xhr = this.schedule(method, task).catch((err: unknown) => {
+      if (err instanceof RetryError) {
+        return err.payload;
+      } else {
+        throw err;
+      }
+    });
 
     // Log the call after it completes.
     xhr.then(res => this._logCall(req, start, res ? res.status : null));
-
     // Return the XHR directly (without the log).
     return xhr;
   }
diff --git a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.js b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.js
deleted file mode 100644
index e520287..0000000
--- a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.js
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * @license
- * Copyright (C) 2019 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import '../../../../test/common-test-setup-karma.js';
-import {SiteBasedCache, FetchPromisesCache, GrRestApiHelper} from './gr-rest-api-helper.js';
-import {getAppContext} from '../../../../services/app-context.js';
-import {stubAuth} from '../../../../test/test-utils.js';
-
-suite('gr-rest-api-helper tests', () => {
-  let helper;
-
-  let cache;
-  let fetchPromisesCache;
-  let originalCanonicalPath;
-  let authFetchStub;
-
-  setup(() => {
-    cache = new SiteBasedCache();
-    fetchPromisesCache = new FetchPromisesCache();
-
-    originalCanonicalPath = window.CANONICAL_PATH;
-    window.CANONICAL_PATH = 'testhelper';
-
-    const mockRestApiInterface = {
-      fire: sinon.stub(),
-    };
-
-    const testJSON = ')]}\'\n{"hello": "bonjour"}';
-    authFetchStub = stubAuth('fetch').returns(Promise.resolve({
-      ok: true,
-      text() {
-        return Promise.resolve(testJSON);
-      },
-    }));
-
-    helper = new GrRestApiHelper(cache, getAppContext().authService,
-        fetchPromisesCache, mockRestApiInterface);
-  });
-
-  teardown(() => {
-    window.CANONICAL_PATH = originalCanonicalPath;
-  });
-
-  suite('fetchJSON()', () => {
-    test('Sets header to accept application/json', () => {
-      helper.fetchJSON({url: '/dummy/url'});
-      assert.isTrue(authFetchStub.called);
-      assert.equal(authFetchStub.lastCall.args[1].headers.get('Accept'),
-          'application/json');
-    });
-
-    test('Use header option accept when provided', () => {
-      const headers = new Headers();
-      headers.append('Accept', '*/*');
-      const fetchOptions = {headers};
-      helper.fetchJSON({url: '/dummy/url', fetchOptions});
-      assert.isTrue(authFetchStub.called);
-      assert.equal(authFetchStub.lastCall.args[1].headers.get('Accept'),
-          '*/*');
-    });
-  });
-
-  test('JSON prefix is properly removed',
-      () => helper.fetchJSON({url: '/dummy/url'}).then(obj => {
-        assert.deepEqual(obj, {hello: 'bonjour'});
-      })
-  );
-
-  test('cached results', () => {
-    let n = 0;
-    sinon.stub(helper, 'fetchJSON').callsFake(() => Promise.resolve(++n));
-    const promises = [];
-    promises.push(helper.fetchCacheURL('/foo'));
-    promises.push(helper.fetchCacheURL('/foo'));
-    promises.push(helper.fetchCacheURL('/foo'));
-
-    return Promise.all(promises).then(results => {
-      assert.deepEqual(results, [1, 1, 1]);
-      return helper.fetchCacheURL('/foo').then(foo => {
-        assert.equal(foo, 1);
-      });
-    });
-  });
-
-  test('cached promise', () => {
-    const promise = Promise.reject(new Error('foo'));
-    cache.set('/foo', promise);
-    return helper.fetchCacheURL({url: '/foo'}).catch(p => {
-      assert.equal(p.message, 'foo');
-    });
-  });
-
-  test('cache invalidation', () => {
-    cache.set('/foo/bar', 1);
-    cache.set('/bar', 2);
-    fetchPromisesCache.set('/foo/bar', 3);
-    fetchPromisesCache.set('/bar', 4);
-    helper.invalidateFetchPromisesPrefix('/foo/');
-    assert.isFalse(cache.has('/foo/bar'));
-    assert.isTrue(cache.has('/bar'));
-    assert.isUndefined(fetchPromisesCache.get('/foo/bar'));
-    assert.strictEqual(4, fetchPromisesCache.get('/bar'));
-  });
-
-  test('params are properly encoded', () => {
-    let url = helper.urlWithParams('/path/', {
-      sp: 'hola',
-      gr: 'guten tag',
-      noval: null,
-    });
-    assert.equal(url,
-        window.CANONICAL_PATH + '/path/?sp=hola&gr=guten%20tag&noval');
-
-    url = helper.urlWithParams('/path/', {
-      sp: 'hola',
-      en: ['hey', 'hi'],
-    });
-    assert.equal(url, window.CANONICAL_PATH + '/path/?sp=hola&en=hey&en=hi');
-
-    // Order must be maintained with array params.
-    url = helper.urlWithParams('/path/', {
-      l: ['c', 'b', 'a'],
-    });
-    assert.equal(url, window.CANONICAL_PATH + '/path/?l=c&l=b&l=a');
-  });
-
-  test('request callbacks can be canceled', () => {
-    let cancelCalled = false;
-    authFetchStub.returns(Promise.resolve({
-      body: {
-        cancel() { cancelCalled = true; },
-      },
-    }));
-    const cancelCondition = () => true;
-    return helper.fetchJSON({url: '/dummy/url', cancelCondition}).then(obj => {
-      assert.isUndefined(obj);
-      assert.isTrue(cancelCalled);
-    });
-  });
-});
-
diff --git a/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.ts b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.ts
new file mode 100644
index 0000000..209cb5c
--- /dev/null
+++ b/polygerrit-ui/app/elements/shared/gr-rest-api-interface/gr-rest-apis/gr-rest-api-helper_test.ts
@@ -0,0 +1,313 @@
+/**
+ * @license
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import '../../../../test/common-test-setup-karma';
+import {
+  SiteBasedCache,
+  FetchPromisesCache,
+  GrRestApiHelper,
+} from './gr-rest-api-helper';
+import {getAppContext} from '../../../../services/app-context';
+import {stubAuth} from '../../../../test/test-utils';
+import {FakeScheduler} from '../../../../services/scheduler/fake-scheduler';
+import {RetryScheduler} from '../../../../services/scheduler/retry-scheduler';
+import {ParsedJSON} from '../../../../types/common';
+import {HttpMethod} from '../../../../api/rest-api';
+import {SinonFakeTimers} from 'sinon';
+
+function makeParsedJSON<T>(val: T): ParsedJSON {
+  return val as unknown as ParsedJSON;
+}
+
+suite('gr-rest-api-helper tests', () => {
+  let clock: SinonFakeTimers;
+  let helper: GrRestApiHelper;
+
+  let cache: SiteBasedCache;
+  let fetchPromisesCache: FetchPromisesCache;
+  let originalCanonicalPath: string | undefined;
+  let authFetchStub: sinon.SinonStub;
+  let readScheduler: FakeScheduler<Response>;
+  let writeScheduler: FakeScheduler<Response>;
+
+  setup(() => {
+    clock = sinon.useFakeTimers();
+    cache = new SiteBasedCache();
+    fetchPromisesCache = new FetchPromisesCache();
+
+    originalCanonicalPath = window.CANONICAL_PATH;
+    window.CANONICAL_PATH = 'testhelper';
+
+    const testJSON = ')]}\'\n{"hello": "bonjour"}';
+    authFetchStub = stubAuth('fetch').returns(
+      Promise.resolve({
+        ...new Response(),
+        ok: true,
+        text() {
+          return Promise.resolve(testJSON);
+        },
+      })
+    );
+
+    readScheduler = new FakeScheduler<Response>();
+    writeScheduler = new FakeScheduler<Response>();
+
+    helper = new GrRestApiHelper(
+      cache,
+      getAppContext().authService,
+      fetchPromisesCache,
+      readScheduler,
+      writeScheduler
+    );
+  });
+
+  teardown(() => {
+    window.CANONICAL_PATH = originalCanonicalPath;
+  });
+
+  async function assertReadRequest() {
+    assert.equal(readScheduler.scheduled.length, 1);
+    await readScheduler.resolve();
+    await flush();
+  }
+
+  async function assertWriteRequest() {
+    assert.equal(writeScheduler.scheduled.length, 1);
+    await writeScheduler.resolve();
+    await flush();
+  }
+
+  suite('send()', () => {
+    setup(() => {
+      authFetchStub.returns(
+        Promise.resolve({
+          ...new Response(),
+          ok: true,
+          text() {
+            return Promise.resolve('Yay');
+          },
+        })
+      );
+    });
+
+    test('GET are sent to readScheduler', async () => {
+      const promise = helper.send({
+        method: HttpMethod.GET,
+        url: '/dummy/url',
+        parseResponse: false,
+      });
+      assert.equal(writeScheduler.scheduled.length, 0);
+      await assertReadRequest();
+      const res: Response = (await promise) as Response;
+      assert.equal(await res.text(), 'Yay');
+    });
+
+    test('PUT are sent to writeScheduler', async () => {
+      const promise = helper.send({
+        method: HttpMethod.PUT,
+        url: '/dummy/url',
+        parseResponse: false,
+      });
+      assert.equal(readScheduler.scheduled.length, 0);
+      await assertWriteRequest();
+      const res: Response = (await promise) as Response;
+      assert.equal(await res.text(), 'Yay');
+    });
+  });
+
+  suite('fetchJSON()', () => {
+    test('Sets header to accept application/json', async () => {
+      helper.fetchJSON({url: '/dummy/url'});
+      assert.isFalse(authFetchStub.called);
+      await assertReadRequest();
+      assert.isTrue(authFetchStub.called);
+      assert.equal(
+        authFetchStub.lastCall.args[1].headers.get('Accept'),
+        'application/json'
+      );
+    });
+
+    test('Use header option accept when provided', async () => {
+      const headers = new Headers();
+      headers.append('Accept', '*/*');
+      const fetchOptions = {headers};
+      helper.fetchJSON({url: '/dummy/url', fetchOptions});
+      assert.isFalse(authFetchStub.called);
+      await assertReadRequest();
+      assert.isTrue(authFetchStub.called);
+      assert.equal(authFetchStub.lastCall.args[1].headers.get('Accept'), '*/*');
+    });
+
+    test('JSON prefix is properly removed', async () => {
+      const promise = helper.fetchJSON({url: '/dummy/url'});
+      await assertReadRequest();
+      const obj = await promise;
+      assert.deepEqual(obj, makeParsedJSON({hello: 'bonjour'}));
+    });
+  });
+
+  test('cached results', () => {
+    let n = 0;
+    sinon
+      .stub(helper, 'fetchJSON')
+      .callsFake(() => Promise.resolve(makeParsedJSON(++n)));
+    const promises = [];
+    promises.push(helper.fetchCacheURL({url: '/foo'}));
+    promises.push(helper.fetchCacheURL({url: '/foo'}));
+    promises.push(helper.fetchCacheURL({url: '/foo'}));
+
+    return Promise.all(promises).then(results => {
+      assert.deepEqual(results, [
+        makeParsedJSON(1),
+        makeParsedJSON(1),
+        makeParsedJSON(1),
+      ]);
+      return helper.fetchCacheURL({url: '/foo'}).then(foo => {
+        assert.equal(foo, makeParsedJSON(1));
+      });
+    });
+  });
+
+  test('cache invalidation', async () => {
+    cache.set('/foo/bar', makeParsedJSON(1));
+    cache.set('/bar', makeParsedJSON(2));
+    fetchPromisesCache.set('/foo/bar', Promise.resolve(makeParsedJSON(3)));
+    fetchPromisesCache.set('/bar', Promise.resolve(makeParsedJSON(4)));
+    helper.invalidateFetchPromisesPrefix('/foo/');
+    assert.isFalse(cache.has('/foo/bar'));
+    assert.isTrue(cache.has('/bar'));
+    assert.isUndefined(fetchPromisesCache.get('/foo/bar'));
+    assert.strictEqual(makeParsedJSON(4), await fetchPromisesCache.get('/bar'));
+  });
+
+  test('params are properly encoded', () => {
+    let url = helper.urlWithParams('/path/', {
+      sp: 'hola',
+      gr: 'guten tag',
+      noval: null,
+    });
+    assert.equal(
+      url,
+      `${window.CANONICAL_PATH}/path/?sp=hola&gr=guten%20tag&noval`
+    );
+
+    url = helper.urlWithParams('/path/', {
+      sp: 'hola',
+      en: ['hey', 'hi'],
+    });
+    assert.equal(url, `${window.CANONICAL_PATH}/path/?sp=hola&en=hey&en=hi`);
+
+    // Order must be maintained with array params.
+    url = helper.urlWithParams('/path/', {
+      l: ['c', 'b', 'a'],
+    });
+    assert.equal(url, `${window.CANONICAL_PATH}/path/?l=c&l=b&l=a`);
+  });
+
+  test('request callbacks can be canceled', async () => {
+    let cancelCalled = false;
+    authFetchStub.returns(
+      Promise.resolve({
+        body: {
+          cancel() {
+            cancelCalled = true;
+          },
+        },
+      })
+    );
+    const cancelCondition = () => true;
+    const promise = helper.fetchJSON({url: '/dummy/url', cancelCondition});
+    await assertReadRequest();
+    const obj = await promise;
+    assert.isUndefined(obj);
+    assert.isTrue(cancelCalled);
+  });
+
+  suite('429 errors', () => {
+    setup(() => {
+      authFetchStub.returns(
+        Promise.resolve({
+          ...new Response(),
+          status: 429,
+          ok: false,
+        })
+      );
+    });
+
+    test('still call errFn when not retried', async () => {
+      const errFn = sinon.stub();
+      const promise = helper.send({
+        method: HttpMethod.GET,
+        url: '/dummy/url',
+        parseResponse: false,
+        errFn,
+      });
+      await assertReadRequest();
+
+      // But we expect the result from the network to return a 429 error when
+      // it's no longer being retried.
+      await promise;
+      assert.isTrue(errFn.called);
+    });
+
+    test('still pass through correctly when not retried', async () => {
+      const promise = helper.send({
+        method: HttpMethod.GET,
+        url: '/dummy/url',
+        parseResponse: false,
+      });
+      await assertReadRequest();
+
+      // But we expect the result from the network to return a 429 error when
+      // it's no longer being retried.
+      const res: Response = (await promise) as Response;
+      assert.equal(res.status, 429);
+    });
+
+    test('are retried', async () => {
+      helper = new GrRestApiHelper(
+        cache,
+        getAppContext().authService,
+        fetchPromisesCache,
+        new RetryScheduler<Response>(readScheduler, 1, 50),
+        writeScheduler
+      );
+      const promise = helper.send({
+        method: HttpMethod.GET,
+        url: '/dummy/url',
+        parseResponse: false,
+      });
+      await assertReadRequest();
+      authFetchStub.returns(
+        Promise.resolve({
+          ...new Response(),
+          ok: true,
+          text() {
+            return Promise.resolve('Yay');
+          },
+        })
+      );
+      // Flush the retry scheduler
+      clock.tick(50);
+      await flush();
+      // We expect a retry.
+      await assertReadRequest();
+      const res: Response = (await promise) as Response;
+      assert.equal(await res.text(), 'Yay');
+    });
+  });
+});