Source code for feeluown.utils.reader

import logging
from abc import ABCMeta, abstractmethod
from typing import (
    List,
    Generic,
    TypeVar,
    Optional,
    Callable,
    Tuple,
    Iterable,
    cast,
    Sequence,
    AsyncIterable,
)

logger = logging.getLogger(__name__)

__all__ = (
    'create_reader',
    'SequentialReader',
    'RandomSequentialReader',
    'Reader',
    'AsyncReader',
    # Below are deprecated.
    'RandomReader',
    'wrap',
)

T = TypeVar("T")


class ReaderException(Exception):
    pass


class CantReadAll(ReaderException):
    pass


class Reader(Generic[T], metaclass=ABCMeta):
    """Base reader class.

    This class is mainly designed for the following use case::

        Provider often splits resource which has large body to chunks, such as
        a large playlist with 10k songs, the client need to send several request
        to fetch the whole playlist. Generally, we call this design Pagination.
        Moreover, different provider has different pagination API. We want a
        unified API, so we create the Reader class.

    Note ``read_*`` method may raise *ANY* exception. In practice, the caller
    should know what exception is possible to happen. For example, for the
    upper use case, ProviderIOError is possible to happen and others are not
    supposed.
    """

    @property
    @abstractmethod
    def count(self) -> int:
        """Total number of objects reader can read."""

    @abstractmethod
    def read_range(self, start: int, end: int) -> List[T]:
        """Read objects in range [start, end)."""

    @abstractmethod
    def read(self, index) -> T:
        """Read object by index.

        :raises IndexError:
        """

    @abstractmethod
    def readall(self) -> List[T]:
        """Read all objects.

        :raises CantReadAll:

        .. versionchanged:: 3.8.10
           Raise CantReadAll instead of ReadFailed exception. ReadFailed inherits
           from ProviderIOError, but Reader and ProviderIOError should not be coupled.
        """

    @abstractmethod
    def _read_next(self) -> T:
        """Read next object. Only for internal usage."""

    def __iter__(self):
        return self

    def __next__(self) -> T:
        """

        .. versionchanged:: 3.8.10
           Do not handle any exception here. The caller should be responsible to
           handle exceptions.
        """
        return self._read_next()


[docs]class SequentialReader(Reader[T]): """Help you sequential read data We only want to launch web request when we need the resource Formerly, we use Python generator to achieve this lazy read feature. However, we can't extract any read meta info, such as total count and current offset, from the ordinary generator. SequentialReader implements the iterator protocol, wraps the generator and store the reader state. .. note:: iterating may be a blocking operation. **Usage example**: >>> def fetch_songs(page=1, page_size=50): ... return list(range(page * page_size, ... (page + 1) * page_size)) ... >>> def create_songs_g(): ... page = 0 ... total_page = 2 ... page_size = 2 ... ... def g(): ... nonlocal page, page_size ... while page < total_page: ... for song in fetch_songs(page, page_size): ... yield song ... page += 1 ... ... total = total_page * page_size ... return SequentialReader(g(), total) ... >>> g = create_songs_g() >>> g.offset, g.count (0, 4) >>> next(g), next(g) (0, 1) >>> list(g) [2, 3] >>> g.offset, g.count (4, 4) .. versionadded:: 3.1 """ def __init__(self, g, count: Optional[int], offset: int = 0): """init :param g: Python generator :param offset: current offset :param count: total count. count can be None, which means the total count is unknown. When it is unknown, be CAREFUL to use list(reader). """ super().__init__() self._g = g self._count = count self.offset = offset self._objects: List[T] = [] @property def count(self): return self._count def readall(self) -> List[T]: if self._count is None: raise CantReadAll("can't readall when count is unknown") list(self) return self._objects def read_range(self, start, end) -> List[T]: assert 0 <= start < end while len(self._objects) < end: try: next(self) except StopIteration: break return self._objects[start:end] def read(self, index): self.read_range(index, index+1) return self._objects[index] def _read_next(self) -> T: if self._count is None or self.offset < self.count: try: obj = next(self._g) except StopIteration: if self._count is None: self._count = self.offset + 1 raise else: raise StopIteration self.offset += 1 self._objects.append(obj) return obj
[docs]class RandomSequentialReader(Reader[T]): def __init__(self, count, read_func: Callable[[int, int], Iterable[T]], max_per_read=100): """random reader constructor :param int count: total number of objects :param function read_func: func(start: int, end: int) -> list :param int max_per_read: max count per read, it must big than 0 """ self.offset = 0 self._count = count self._ranges: List[Tuple[int, int]] = [] # list of tuple self._objects: List[Optional[T]] = [None] * count self._read_func = read_func assert max_per_read > 0, 'max_per_read must big than 0' self._max_per_read = max_per_read @property def count(self): return self._count def read(self, index): """read object by index If the object is not already read, this method may trigger IO operation. """ yes, r = self._has_index(index) if yes: return self._objects[index] self._read_range(*r) return self._objects[index] def readall(self) -> List[T]: """read all objects :return list: list of objects :raises ReadFailed: """ # all objects have been read if len(self._ranges) == 1 and self._ranges[0] == (0, self._count): return cast(List[T], self._objects) start = 0 end = 0 count = self._count while end < count: end = min(count, end + self._max_per_read) self._read_range(start, end) start = end return cast(List[T], self._objects) # def explain_readall(self): # read_times = self._count / self._max_per_read # if self._count % self._max_per_read > 0: # read_times += 1 # return {'count': self._count, # 'max_per_read': self._max_per_read, # 'read_times': read_times} def read_range(self, start, end) -> List[T]: self._read_range(start, end) return cast(List[T], self._objects[start:end]) def _read_range(self, start, end): # TODO: make this method thread safe assert start <= end, 'start should less than end' logger.debug('trigger read_func(%d, %d)', start, end) objs = list(self._read_func(start, end)) actual = len(objs) self._objects[start:start + actual] = objs self._refresh_ranges() def _has_index(self, index): has_been_read = False left_index = right_index = None # [left, right) -> range to read gt_index = None for r in self._ranges: start, end = r if index < start: # [, gt_index) index [start, end) gt_index = gt_index if gt_index is not None else 0 left_index = index right_index = min(start, index + self._max_per_read) # trick: read as much as possible at a time to improve performance if start - gt_index <= self._max_per_read: left_index = gt_index right_index = start break # found index elif start <= index < end: has_been_read = True left_index, right_index = start, end break else: gt_index = end else: # default read range [index, min(index + max_per_read, self._count)) left_index = index right_index = min(index + self._max_per_read, self._count) # trick: read as much as possible at a time to improve performance if gt_index is not None: if self._count - gt_index < self._max_per_read: left_index = gt_index right_index = self._count return has_been_read, (left_index, right_index) def _refresh_ranges(self): ranges = [] start = None for i, obj in enumerate(self._objects): if start is None and obj is not None: start = i continue if start is not None and obj is None: ranges.append((start, i)) start = None if start is not None: ranges.append((start, len(self._objects))) self._ranges = ranges def _read_next(self): if self.offset >= self._count: raise StopIteration obj = self.read(self.offset) self.offset += 1 return obj
RandomReader = RandomSequentialReader # For backward compatibility. class AsyncReader: """Async version of reader. .. versionadded:: 3.8.10 """ pass class AsyncSequentialReader(AsyncReader): def __init__(self, g, count, offset=0): """init :param g: Python generator :param offset: current offset :param count: total count. count can be None, which means the total count is unknown. When it is unknown, be CAREFUL to use list(reader). """ self._g = g self._count = count self.offset = offset self._objects = [] @property def count(self): return self._count async def a_readall(self): if self._count is None: raise CantReadAll("can't readall when count is unknown") async for _ in self: pass return self._objects async def a_read_next(self): if self._count is None or self.offset < self.count: try: obj = await self._g.asend(None) except StopAsyncIteration: if self._count is None: self._count = self.offset + 1 raise else: raise StopAsyncIteration self.offset += 1 self._objects.append(obj) return obj def __aiter__(self): return self async def __anext__(self): try: return await self.a_read_next() except StopAsyncIteration: raise def wrap(iterable): """ .. versionadded:: 3.4 .. deprecated:: 3.7.7 Use :func:`create_reader` instead. """ # if it is a reader already, just return it if isinstance(iterable, Reader): return iterable # async reader if isinstance(iterable, AsyncIterable): return AsyncSequentialReader(iterable, count=None) if not isinstance(iterable, Iterable): raise TypeError(f'must be a Iterable, got {type(iterable)}') if isinstance(iterable, Sequence): count = len(iterable) return RandomSequentialReader(count, lambda start, end: iterable[start:end], max_per_read=max(count, 1)) # maybe a generator/iterator return SequentialReader(iterable, count=None) def create_reader(iterable): """Create a reader from an iterable. >>> reader = wrap([1, 2]) >>> isinstance(reader, RandomSequentialReader) True >>> reader.readall() [1, 2] >>> isinstance(wrap(iter([])), SequentialReader) True >>> wrap(None) Traceback (most recent call last): ... TypeError: must be a Iterable, got <class 'NoneType'> .. versionadded:: 3.7.7 """ return wrap(iterable)