Source code for feeluown.utils.reader

"""
Provider often splits resource which has large body to chunks, such as
a large playlist with 10k songs, the client need to send several request
to fetch the whole playlist. Generally, we call this design Pagination.
Moreover, different provider has different pagination API.
For feeluown, we want a unified API, so we create the Reader class.
"""
import logging
from collections.abc import Iterable, Sequence, AsyncIterable

from feeluown.excs import ReadFailed, ProviderIOError

logger = logging.getLogger(__name__)

__all__ = (
    'SequentialReader',
    'RandomReader',
    'RandomSequentialReader',
    'wrap',
)


class Reader:
    """Reader base class"""

    allow_sequential_read = False
    allow_random_read = False
    is_async = False

    def __init__(self):
        self._objects = []


class SequentialReadMixin:

    def __iter__(self):
        return self

    def __next__(self):
        try:
            return self.read_next()
        except StopIteration:
            raise
        # TODO: caller should not crash when reader raise other exception
        except Exception as e:
            raise ProviderIOError('read next obj failed') from e


class AsyncSequntialReadMixin:
    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            return await self.a_read_next()
        except StopAsyncIteration:
            raise
        # TODO: caller should not crash when reader raise other exception
        except Exception as e:
            raise ProviderIOError('read next obj failed') from e


class BaseSequentialReader(Reader):
    allow_sequential_read = True

    def __init__(self, g, count, offset=0):
        """init

        :param g: Python generator
        :param offset: current offset
        :param count: total count. count can be None, which means the
                      total count is unknown. When it is unknown, be
                      CAREFUL to use list(reader).
        """
        super().__init__()
        self._g = g
        self.count = count
        self.offset = offset


[docs]class SequentialReader(BaseSequentialReader, SequentialReadMixin): """Help you sequential read data We only want to launch web request when we need the resource Formerly, we use Python generator to achieve this lazy read feature. However, we can't extract any read meta info, such as total count and current offset, from the ordinary generator. SequentialReader implements the iterator protocol, wraps the generator and store the reader state. .. note:: iterating may be a blocking operation. **Usage example**: >>> def fetch_songs(page=1, page_size=50): ... return list(range(page * page_size, ... (page + 1) * page_size)) ... >>> def create_songs_g(): ... page = 0 ... total_page = 2 ... page_size = 2 ... ... def g(): ... nonlocal page, page_size ... while page < total_page: ... for song in fetch_songs(page, page_size): ... yield song ... page += 1 ... ... total = total_page * page_size ... return SequentialReader(g(), total) ... >>> g = create_songs_g() >>> g.offset, g.count (0, 4) >>> next(g), next(g) (0, 1) >>> list(g) [2, 3] >>> g.offset, g.count (4, 4) .. versionadded:: 3.1 """ def readall(self): if self.count is None: raise ReadFailed("can't readall when count is unknown") list(self) return self._objects def read_next(self): if self.count is None or self.offset < self.count: try: obj = next(self._g) except StopIteration: if self.count is None: self.count = self.offset + 1 raise else: raise StopIteration self.offset += 1 self._objects.append(obj) return obj
[docs]class RandomReader(Reader): allow_random_read = True
[docs] def __init__(self, count, read_func, max_per_read): """random reader constructor :param int count: total number of objects :param function read_func: func(start: int, end: int) -> list :param int max_per_read: max count per read, it must big than 0 """ super().__init__() self.count = count self._ranges = [] # list of tuple self._objects = [None] * count self._read_func = read_func assert max_per_read > 0, 'max_per_read must big than 0' self._max_per_read = max_per_read
[docs] def read(self, index): """read object by index if the object is not already read, this method may trigger IO operation. :raises ReadFailed: when the IO operation fails """ yes, r = self._has_index(index) if yes: return self._objects[index] self._read_range(*r) return self._objects[index]
[docs] def readall(self): """read all objects :return list: list of objects :raises ReadFailed: """ # all objects have been read if len(self._ranges) == 1 and self._ranges[0] == (0, self.count): return self._objects start = 0 end = 0 count = self.count while end < count: end = min(count, end + self._max_per_read) self._read_range(start, end) start = end return self._objects
# def explain_readall(self): # read_times = self.count / self._max_per_read # if self.count % self._max_per_read > 0: # read_times += 1 # return {'count': self.count, # 'max_per_read': self._max_per_read, # 'read_times': read_times} def _read_range(self, start, end): # TODO: make this method thread safe assert start <= end, "start should less than end" try: logger.info('trigger read_func(%d, %d)', start, end) objs = self._read_func(start, end) except: # noqa: E722 raise ReadFailed('read_func raise error') else: expected = end - start actual = len(objs) if expected != actual: raise ReadFailed('read_func returns unexpected number of objects: ' 'expected={}, actual={}' .format(expected, actual)) self._objects[start:end] = objs self._refresh_ranges() def _has_index(self, index): has_been_read = False left_index = right_index = None # [left, right) -> range to read gt_index = None for r in self._ranges: start, end = r if index < start: # [, gt_index) index [start, end) gt_index = gt_index if gt_index is not None else 0 left_index = index right_index = min(start, index + self._max_per_read) # trick: read as much as possible at a time to improve performance if start - gt_index <= self._max_per_read: left_index = gt_index right_index = start break # found index elif start <= index < end: has_been_read = True left_index, right_index = start, end break else: gt_index = end else: # default read range [index, min(index + max_per_read, self.count)) left_index = index right_index = min(index + self._max_per_read, self.count) # trick: read as much as possible at a time to improve performance if gt_index is not None: if self.count - gt_index < self._max_per_read: left_index = gt_index right_index = self.count return has_been_read, (left_index, right_index) def _refresh_ranges(self): ranges = [] start = None for i, obj in enumerate(self._objects): if start is None and obj is not None: start = i continue if start is not None and obj is None: ranges.append((start, i)) start = None if start is not None: ranges.append((start, len(self._objects))) self._ranges = ranges
[docs]class RandomSequentialReader(RandomReader, SequentialReadMixin): """random reader which support sequential read""" allow_sequential_read = True def __init__(self, count, read_func, max_per_read=100): super().__init__(count, read_func, max_per_read=max_per_read) self.offset = 0 def read_next(self): if self.offset >= self.count: raise StopIteration obj = self.read(self.offset) self.offset += 1 return obj
class AsyncSequentialReader(BaseSequentialReader, AsyncSequntialReadMixin): is_async = True async def a_readall(self): if self.count is None: raise ReadFailed("can't readall when count is unknown") async for _ in self: pass return self._objects async def a_read_next(self): if self.count is None or self.offset < self.count: try: obj = await self._g.asend(None) except StopAsyncIteration: if self.count is None: self.count = self.offset + 1 raise else: raise StopAsyncIteration self.offset += 1 self._objects.append(obj) return obj def wrap(iterable): """create a reader from an iterable >>> reader = wrap([1, 2]) >>> isinstance(reader, RandomSequentialReader) True >>> reader.readall() [1, 2] >>> isinstance(wrap(iter([])), SequentialReader) True >>> wrap(None) Traceback (most recent call last): ... TypeError: must be a Iterable, got <class 'NoneType'> .. versionadded:: 3.4 .. deprecated:: 3.7.7 Use :func:`create_reader` instead. """ # if it is a reader already, just return it if isinstance(iterable, Reader): return iterable # async reader if isinstance(iterable, AsyncIterable): return AsyncSequentialReader(iterable, count=None) if not isinstance(iterable, Iterable): raise TypeError("must be a Iterable, got {}".format(type(iterable))) if isinstance(iterable, Sequence): count = len(iterable) return RandomSequentialReader(count, lambda start, end: iterable[start:end], max_per_read=max(count, 1)) # maybe a generator/iterator return SequentialReader(iterable, count=None) def create_reader(iterable): """ .. versionadded:: 3.7.7 """ return wrap(iterable)