platform_utils: delete unused FileDescriptorStreams APIs

Now that we've converted the few users of this over to subprocess APIs,
we don't need this anymore.  It's been a bit hairy to maintain across
different operating systems, so there's no desire to bring it back.

Using multiprocessing Pool to batch things has been working better in
general anyways.

Change-Id: I10769e96f60ecf27a80d8cc2aa0d1b199085252e
Reviewed-on: https://gerrit-review.googlesource.com/c/git-repo/+/297682
Reviewed-by: Michael Mortensen <mmortensen@google.com>
Tested-by: Mike Frysinger <vapier@google.com>
diff --git a/platform_utils.py b/platform_utils.py
index a280982..00c51d9 100644
--- a/platform_utils.py
+++ b/platform_utils.py
@@ -15,11 +15,8 @@
 import errno
 import os
 import platform
-from queue import Queue
-import select
 import shutil
 import stat
-from threading import Thread
 
 
 def isWindows():
@@ -31,161 +28,6 @@
   return platform.system() == "Windows"
 
 
-class FileDescriptorStreams(object):
-  """ Platform agnostic abstraction enabling non-blocking I/O over a
-  collection of file descriptors. This abstraction is required because
-  fctnl(os.O_NONBLOCK) is not supported on Windows.
-  """
-  @classmethod
-  def create(cls):
-    """ Factory method: instantiates the concrete class according to the
-    current platform.
-    """
-    if isWindows():
-      return _FileDescriptorStreamsThreads()
-    else:
-      return _FileDescriptorStreamsNonBlocking()
-
-  def __init__(self):
-    self.streams = []
-
-  def add(self, fd, dest, std_name):
-    """ Wraps an existing file descriptor as a stream.
-    """
-    self.streams.append(self._create_stream(fd, dest, std_name))
-
-  def remove(self, stream):
-    """ Removes a stream, when done with it.
-    """
-    self.streams.remove(stream)
-
-  @property
-  def is_done(self):
-    """ Returns True when all streams have been processed.
-    """
-    return len(self.streams) == 0
-
-  def select(self):
-    """ Returns the set of streams that have data available to read.
-    The returned streams each expose a read() and a close() method.
-    When done with a stream, call the remove(stream) method.
-    """
-    raise NotImplementedError
-
-  def _create_stream(self, fd, dest, std_name):
-    """ Creates a new stream wrapping an existing file descriptor.
-    """
-    raise NotImplementedError
-
-
-class _FileDescriptorStreamsNonBlocking(FileDescriptorStreams):
-  """ Implementation of FileDescriptorStreams for platforms that support
-  non blocking I/O.
-  """
-  def __init__(self):
-    super(_FileDescriptorStreamsNonBlocking, self).__init__()
-    self._poll = select.poll()
-    self._fd_to_stream = {}
-
-  class Stream(object):
-    """ Encapsulates a file descriptor """
-
-    def __init__(self, fd, dest, std_name):
-      self.fd = fd
-      self.dest = dest
-      self.std_name = std_name
-      self.set_non_blocking()
-
-    def set_non_blocking(self):
-      import fcntl
-      flags = fcntl.fcntl(self.fd, fcntl.F_GETFL)
-      fcntl.fcntl(self.fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
-    def fileno(self):
-      return self.fd.fileno()
-
-    def read(self):
-      return self.fd.read(4096)
-
-    def close(self):
-      self.fd.close()
-
-  def _create_stream(self, fd, dest, std_name):
-    stream = self.Stream(fd, dest, std_name)
-    self._fd_to_stream[stream.fileno()] = stream
-    self._poll.register(stream, select.POLLIN)
-    return stream
-
-  def remove(self, stream):
-    self._poll.unregister(stream)
-    del self._fd_to_stream[stream.fileno()]
-    super(_FileDescriptorStreamsNonBlocking, self).remove(stream)
-
-  def select(self):
-    return [self._fd_to_stream[fd] for fd, _ in self._poll.poll()]
-
-
-class _FileDescriptorStreamsThreads(FileDescriptorStreams):
-  """ Implementation of FileDescriptorStreams for platforms that don't support
-  non blocking I/O. This implementation requires creating threads issuing
-  blocking read operations on file descriptors.
-  """
-
-  def __init__(self):
-    super(_FileDescriptorStreamsThreads, self).__init__()
-    # The queue is shared accross all threads so we can simulate the
-    # behavior of the select() function
-    self.queue = Queue(10)  # Limit incoming data from streams
-
-  def _create_stream(self, fd, dest, std_name):
-    return self.Stream(fd, dest, std_name, self.queue)
-
-  def select(self):
-    # Return only one stream at a time, as it is the most straighforward
-    # thing to do and it is compatible with the select() function.
-    item = self.queue.get()
-    stream = item.stream
-    stream.data = item.data
-    return [stream]
-
-  class QueueItem(object):
-    """ Item put in the shared queue """
-
-    def __init__(self, stream, data):
-      self.stream = stream
-      self.data = data
-
-  class Stream(object):
-    """ Encapsulates a file descriptor """
-
-    def __init__(self, fd, dest, std_name, queue):
-      self.fd = fd
-      self.dest = dest
-      self.std_name = std_name
-      self.queue = queue
-      self.data = None
-      self.thread = Thread(target=self.read_to_queue)
-      self.thread.daemon = True
-      self.thread.start()
-
-    def close(self):
-      self.fd.close()
-
-    def read(self):
-      data = self.data
-      self.data = None
-      return data
-
-    def read_to_queue(self):
-      """ The thread function: reads everything from the file descriptor into
-      the shared queue and terminates when reaching EOF.
-      """
-      for line in iter(self.fd.readline, b''):
-        self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, line))
-      self.fd.close()
-      self.queue.put(_FileDescriptorStreamsThreads.QueueItem(self, b''))
-
-
 def symlink(source, link_name):
   """Creates a symbolic link pointing to source named link_name.
   Note: On Windows, source must exist on disk, as the implementation needs