import logging
import time
from threading import RLock
from enum import IntEnum, Enum, IntFlag
logger = logging.getLogger(__name__)
_NOT_FOUND = object()
class cached_field:
"""like functools.cached_property, but designed for Model
>>> class User:
... @cached_field()
... def playlists(self):
... return [1, 2]
...
>>> user = User()
>>> user2 = User()
>>> user.playlists = None
>>> user.playlists
[1, 2]
>>> user.playlists = [3, 4]
>>> user.playlists
[3, 4]
>>> user2.playlists
[1, 2]
"""
def __init__(self, ttl=None):
self._ttl = ttl
self.lock = RLock()
def __call__(self, func):
self.func = func
return self
def __get__(self, obj, owner):
if obj is None: # Class.field
return self
try:
# XXX: maybe we can use use a special attribute
# (such as _cached_{name}) to store the cache value
# instead of __dict__
cache = obj.__dict__
except AttributeError:
raise TypeError("obj should have __dict__ attribute") from None
cache_key = '_cache_' + self.func.__name__
datum = cache.get(cache_key, _NOT_FOUND)
if self._should_refresh_datum(datum):
with self.lock:
# check if another thread filled cache while we awaited lock
datum = cache.get(cache_key, _NOT_FOUND)
if self._should_refresh_datum(datum):
value = self.func(obj)
cache[cache_key] = datum = self._gen_datum(value)
return datum[1]
def __set__(self, obj, value):
cache_key = '_cache_' + self.func.__name__
obj.__dict__[cache_key] = self._gen_datum(value)
def _should_refresh_datum(self, datum):
return (
datum is _NOT_FOUND or # not initialized
datum[1] is None or # None implies that the value can be refreshed
datum[0] is not None and datum[0] < time.time() # expired
)
def _gen_datum(self, value):
if self._ttl is None:
expired_at = None
else:
expired_at = int(time.time()) + self._ttl
return (expired_at, value)
[docs]class ModelType(IntEnum):
dummy = 0
song = 1
artist = 2
album = 3
playlist = 4
lyric = 5
video = 6
user = 17
comment = 18
none = 128
class SearchType(Enum):
pl = 'playlist'
al = 'album'
ar = 'artist'
so = 'song'
vi = 'video'
@classmethod
def parse(cls, obj):
"""get member from object
:param obj: string or SearchType member
:return: SearchType member
>>> SearchType.parse('playlist')
<SearchType.pl: 'playlist'>
>>> SearchType.parse(SearchType.pl)
<SearchType.pl: 'playlist'>
>>> SearchType.parse('xxx')
Traceback (most recent call last):
...
ValueError: 'xxx' is not a valid SearchType value
"""
if isinstance(obj, SearchType):
return obj
type_aliases_map = {
cls.pl: ('playlist', 'pl'),
cls.al: ('album', 'al'),
cls.ar: ('artist', 'ar'),
cls.so: ('song', 'so'),
cls.vi: ('video', 'vi'),
}
for type_, aliases in type_aliases_map.items():
if obj in aliases:
return type_
raise ValueError("'%s' is not a valid SearchType value" % obj)
@classmethod
def batch_parse(cls, obj):
"""get list of member from obj
:param obj: obj can be string, list of string or list of member
:return: list of member
>>> SearchType.batch_parse('pl,ar')
[<SearchType.pl: 'playlist'>, <SearchType.ar: 'artist'>]
>>> SearchType.batch_parse(['pl', 'ar'])
[<SearchType.pl: 'playlist'>, <SearchType.ar: 'artist'>]
>>> SearchType.batch_parse('al')
[<SearchType.al: 'album'>]
>>> SearchType.batch_parse(SearchType.al)
[<SearchType.al: 'album'>]
>>> SearchType.batch_parse([SearchType.al])
[<SearchType.al: 'album'>]
"""
if isinstance(obj, SearchType):
return [obj]
if isinstance(obj, str):
return [cls.parse(s) for s in obj.split(',')]
return [cls.parse(s) for s in obj]
class ModelStage(IntEnum):
"""Model 所处的阶段,有大小关系
通过 create_by_display 工厂函数创建的实例,实例所处阶段为 display,
通过构造函数创建的实例,阶段为 inited, 如果 model 已经 get 过,
则阶段为 gotten.
目前,主要是 __getattribute__ 方法需要读取 model 所处的阶段,
避免重复 get model。
"""
display = 4
inited = 8
gotten = 16
class ModelExistence(IntEnum):
"""资源是否真的存在
在许多音乐平台,当一个歌手、专辑不存在时,它们的接口可能构造一个
id 为 0, name 为 None 的字典。这类 model.exists 应该被置为 no。
这个字段不应该被缓存。
"""
no = -1
unknown = 0
yes = 1
class ModelMetadata(object):
def __init__(self,
model_type=ModelType.dummy.value,
provider=None,
fields=None,
fields_display=None,
fields_no_get=None,
paths=None,
allow_get=False,
allow_batch=False,
**kwargs):
"""Model metadata class
:param allow_get: if get method is implemented
:param allow_batch: if list method is implemented
"""
self.model_type = model_type
self.provider = provider
self.fields = fields or []
self.fields_display = fields_display or []
self.fields_no_get = fields_no_get or []
self.paths = paths or []
self.allow_get = allow_get
self.allow_batch = allow_batch
for key, value in kwargs.items():
setattr(self, key, value)
class display_property:
"""Model 的展示字段的描述器"""
def __init__(self, name):
#: display 属性对应的真正属性的名字
self.name_real = name
#: 用来存储值的属性名
self.store_pname = '_display_store_' + name
def __get__(self, instance, _=None):
if instance is None:
return self
if instance.stage >= ModelStage.inited:
return getattr(instance, self.name_real)
return getattr(instance, self.store_pname, '')
def __set__(self, instance, value):
setattr(instance, self.store_pname, value)
class ModelMeta(type):
def __new__(cls, name, bases, attrs):
# 获取 Model 当前以及父类中的 Meta 信息
# 如果 Meta 中相同字段的属性,子类的值可以覆盖父类的
_metas = []
for base in bases:
base_meta = getattr(base, '_meta', None)
if base_meta is not None:
_metas.append(base_meta)
Meta = attrs.pop('Meta', None)
if Meta:
_metas.append(Meta)
kind_fields_map = {'fields': [],
'fields_display': [],
'fields_no_get': [],
'paths': []}
meta_kv = {} # 实例化 ModelMetadata 的 kv 对
for _meta in _metas:
for kind, fields in kind_fields_map.items():
fields.extend(getattr(_meta, kind, []))
for k, v in _meta.__dict__.items():
if k.startswith('_') or k in kind_fields_map:
continue
if k == 'model_type':
if ModelType(v) != ModelType.dummy:
meta_kv[k] = v
else:
meta_kv[k] = v
klass = type.__new__(cls, name, bases, attrs)
# update provider
provider = meta_kv.pop('provider', None)
model_type = meta_kv.pop('model_type', ModelType.dummy.value)
if provider and ModelType(model_type) != ModelType.dummy:
provider.set_model_cls(model_type, klass)
fields_all = list(set(kind_fields_map['fields']))
fields_display = list(set(kind_fields_map['fields_display']))
fields_no_get = list(set(kind_fields_map['fields_no_get']))
paths = list(set(kind_fields_map['paths']))
for field in fields_display:
setattr(klass, field + '_display', display_property(field))
# DEPRECATED attribute _meta
# TODO: remove this in verion 2.3
klass._meta = ModelMetadata(model_type=model_type,
provider=provider,
fields=fields_all,
fields_display=fields_display,
fields_no_get=fields_no_get,
paths=paths,
**meta_kv)
# FIXME: theoretically, different provider can share same model,
# so source field should be a instance attribute instead of class attribute.
# however, we don't have enough time to fix this whole design.
klass.source = provider.identifier if provider is not None else None
# use meta attribute instead of _meta
klass.meta = klass._meta
return klass
[docs]class AlbumType(Enum):
"""Album type enumeration
中文解释::
Single 和 EP 会有一些交集,在展示时,会在一起展示,比如 Singles & EPs。
Compilation 和 Retrospective 也会有交集,展示时,也通常放在一起,统称“合辑”。
References:
1. https://www.zhihu.com/question/22888388/answer/33255107
2. https://zh.wikipedia.org/wiki/%E5%90%88%E8%BC%AF
"""
standard = 'standard'
single = 'single'
ep = 'EP'
live = 'live'
compilation = 'compilation'
retrospective = 'retrospective'
[docs] @classmethod
def guess_by_name(cls, name):
"""guess album type by its name"""
# album name which contains following string are `Single`
# 1. ' - Single' 6+3=9
# 2. '(single)' 6+2=8
# 3. '(single)' 6+2=8
if 'single' in name[-9:].lower():
return cls.single
# ' - EP'
if 'ep' in name[-5:].lower():
return cls.ep
if 'live' in name or '演唱会' in name or \
'音乐会' in name:
return cls.live
# '精选集', '精选'
if '精选' in name[-3:]:
return cls.retrospective
return cls.standard
class Model(metaclass=ModelMeta):
"""base class for data models
Usage::
class User(Model):
class Meta:
fields = ['name', 'title']
user = UserModel(name='xxx')
assert user.name == 'xxx'
user2 = UserModel(user)
assert user2.name == 'xxx'
"""
def __init__(self, obj=None, **kwargs):
# ensure all field are initialized to None
for field in self.meta.fields:
setattr(self, field, None)
# copy fields from obj as many as possible
if obj is not None:
for field in obj.meta.fields:
value = object.__getattribute__(obj, field)
setattr(self, field, value)
for field in obj.meta.fields_display:
field_name = f'{field}_display'
field_display_prop = getattr(type(self), field_name)
field_display_prop.__set__(self, getattr(obj, field_name))
# source should be a instance attribute although it is not temporarily
self.source = obj.source
self.stage = obj.stage
self.exists = obj.exists
else:
for field in self.meta.fields:
setattr(self, field, None)
#: model 所处阶段。目前,通过构造函数初始化的 model
# 所处阶段为 inited,通过 get 得到的 model,所处阶段为 gotten,
# 通过 display 属性构造的 model,所处阶段为 display。
# 目前,此属性仅为 models 模块使用,不推荐外部依赖。
self.stage = kwargs.get('stage', ModelStage.inited)
#: 歌曲是否存在。如果 Model allow_get,但 get 却不能获取到 model,
# 则该 model 不存在。
self.exists = kwargs.get('stage', ModelExistence.unknown)
for k, v in kwargs.items():
if k in self.meta.fields:
setattr(self, k, v)
def __getattribute__(self, name):
"""
获取 model 某一属性时,如果该属性值为 None 且该属性是 field
且该属性允许触发 get 方法,这时,我们尝试通过获取 model
详情来初始化这个字段,于此同时,还会重新给部分 fields 重新赋值。
"""
cls = type(self)
cls_name = cls.__name__
value = object.__getattribute__(self, name)
if name in ('identifier', 'meta', '_meta', 'stage', 'exists'):
return value
if name in cls.meta.fields \
and name not in cls.meta.fields_no_get \
and value is None \
and cls.meta.allow_get \
and self.stage < ModelStage.gotten \
and self.exists != ModelExistence.no:
# debug snippet: show info of the caller that trigger the model.get call
#
# import inspect
# frame = inspect.currentframe()
# caller = frame.f_back
# logger.info(
# '%s %d %s',
# caller.f_code.co_filename, caller.f_lineno, caller.f_code.co_name
# )
logger.debug("Model {} {}'s value is None, try to get detail."
.format(repr(self), name))
obj = cls.get(self.identifier)
if obj is not None:
for field in cls.meta.fields:
# 类似 @property/@cached_field 等字段,都应该加入到
# fields_no_get 列表中
if field in cls.meta.fields_no_get:
continue
# 这里不能使用 getattr,否则有可能会无限 get
fv = object.__getattribute__(obj, field)
if fv is not None:
setattr(self, field, fv)
self.stage = ModelStage.gotten
self.exists = ModelExistence.yes
else:
self.exists = ModelExistence.no
logger.warning('Model {} get return None'.format(cls_name))
value = object.__getattribute__(self, name)
return value
@classmethod
def create_by_display(cls, identifier, **kwargs):
"""create model instance with identifier and display fields"""
model = cls(identifier=identifier)
model.stage = ModelStage.display
model.exists = ModelExistence.unknown
for k, v in kwargs.items():
if k in cls.meta.fields_display:
setattr(model, k + '_display', v)
return model
class ModelFlags(IntFlag):
none = 0x00000000
v1 = 0x00000001
v2 = 0x00000002
brief = 0x00000010
normal = brief | 0x00000020