feat: 支持kg源歌词获取

This commit is contained in:
helloplhm-qwq 2023-12-31 14:05:45 +08:00
parent 18fd6a4cb2
commit 0d21d71a61
No known key found for this signature in database
GPG Key ID: 6BE1B64B905567C7
5 changed files with 197 additions and 6 deletions

View File

@ -15,7 +15,7 @@ import zlib
import time import time
import re import re
import xmltodict import xmltodict
from urllib.parse import quote from urllib.parse import quote, unquote, urlparse
from hashlib import md5 as handleCreateMD5 from hashlib import md5 as handleCreateMD5
def createBase64Encode(data_bytes): def createBase64Encode(data_bytes):
@ -88,8 +88,35 @@ def unique_list(list_in):
return unique_list return unique_list
def encodeURIComponent(component): def encodeURIComponent(component):
if (isinstance(component, str)):
component = component.encode('utf-8')
elif (not isinstance(component, bytes)):
raise TypeError('component must be str or bytes')
return quote(component) return quote(component)
def decodeURIComponent(component):
return unquote(component)
def encodeURI(uri):
parse_result = urlparse(uri)
params = {}
for q in parse_result.query.split('&'):
k, v = q.split('=')
v = encodeURIComponent(v)
params[k] = v
query = '&'.join([f'{k}={v}' for k, v in params.items()])
return parse_result._replace(query=query).geturl()
def decodeURI(uri):
parse_result = urlparse(uri)
params = {}
for q in parse_result.query.split('&'):
k, v = q.split('=')
v = decodeURIComponent(v)
params[k] = v
query = '&'.join([f'{k}={v}' for k, v in params.items()])
return parse_result._replace(query=query).geturl()
def sortDict(dictionary): def sortDict(dictionary):
sorted_items = sorted(dictionary.items()) sorted_items = sorted(dictionary.items())
sorted_dict = {k: v for k, v in sorted_items} sorted_dict = {k: v for k, v in sorted_items}

View File

@ -88,7 +88,7 @@ async def url(source, songId, quality):
} }
try: try:
result = await func(songId, quality) result = await func(songId, quality)
logger.debug(f'获取{source}_{songId}_{quality}成功URL{result["url"]}') logger.info(f'获取{source}_{songId}_{quality}成功URL{result["url"]}')
canExpire = sourceExpirationTime[source]['expire'] canExpire = sourceExpirationTime[source]['expire']
expireTime = sourceExpirationTime[source]['time'] + int(time.time()) expireTime = sourceExpirationTime[source]['time'] + int(time.time())
@ -116,6 +116,43 @@ async def url(source, songId, quality):
}, },
}, },
} }
except FailedException as e:
logger.info(f'获取{source}_{songId}_{quality}失败,原因:' + e.args[0])
return {
'code': 2,
'msg': e.args[0],
'data': None,
}
async def lyric(source, songId, _):
cache = config.getCache('lyric', f'{source}_{songId}')
if cache:
return {
'code': 0,
'msg': 'success',
'data': cache['data']
}
try:
func = require('modules.' + source + '.lyric')
except:
return {
'code': 1,
'msg': '未知的源或不支持的方法',
'data': None,
}
try:
result = await func(songId)
config.updateCache('lyric', f'{source}_{songId}', {
"data": result,
"time": int(time.time() + (86400 * 3)), # 歌词缓存3天
"expire": True,
})
logger.debug(f'缓存已更新:{source}_{songId}, lyric: {result}')
return {
'code': 0,
'msg': 'success',
'data': result
}
except FailedException as e: except FailedException as e:
return { return {
'code': 2, 'code': 2,

View File

@ -12,6 +12,8 @@ from .musicInfo import getMusicSingerInfo as _getInfo2
from .musicInfo import getMusicInfo as _getInfo from .musicInfo import getMusicInfo as _getInfo
from .utils import tools from .utils import tools
from .player import url from .player import url
from .lyric import getLyric as _getLyric
from .lyric import lyricSearchByHash as _lyricSearch
from .mv import getMvInfo as _getMvInfo from .mv import getMvInfo as _getMvInfo
from .mv import getMvPlayURL as _getMvUrl from .mv import getMvPlayURL as _getMvUrl
from common.exceptions import FailedException from common.exceptions import FailedException
@ -69,3 +71,8 @@ async def mv(hash_):
res2 = res[1] res2 = res[1]
res1['play_info'] = res2 res1['play_info'] = res2
return res1 return res1
async def lyric(hash_):
lyric_search_result = await _lyricSearch(hash_)
choosed_lyric = lyric_search_result[0]
return await _getLyric(choosed_lyric['id'], choosed_lyric['accesskey'])

123
modules/kg/lyric.py Normal file
View File

@ -0,0 +1,123 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: lyric.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from common.utils import encodeURI, createBase64Decode
from .musicInfo import getMusicInfo
from common import Httpx
import ujson as json
import zlib
import re
class ParseTools:
def __init__(self):
self.head_exp = r'^.*\[id:\$\w+\]\n'
def parse(self, string):
string = string.replace('\r', '')
if re.match(self.head_exp, string):
string = re.sub(self.head_exp, '', string)
trans = re.search(r'\[language:([\w=\\/+]+)\]', string)
lyric = None
rlyric = None
tlyric = None
if trans:
string = re.sub(r'\[language:[\w=\\/+]+\]\n', '', string)
decoded_trans = createBase64Decode(trans.group(1)).decode('utf-8')
trans_json = json.loads(decoded_trans)
for item in trans_json['content']:
if item['type'] == 0:
rlyric = item['lyricContent']
elif item['type'] == 1:
tlyric = item['lyricContent']
self.i = 0
lxlyric = re.sub(r'\[((\d+),\d+)\].*', lambda x: self.process_lyric_match(x, rlyric, tlyric, self.i), string)
rlyric = '\n'.join(rlyric) if rlyric else ''
tlyric = '\n'.join(tlyric) if tlyric else ''
lxlyric = re.sub(r'<(\d+,\d+),\d+>', r'<\1>', lxlyric)
lyric = re.sub(r'<\d+,\d+>', '', lxlyric)
return {
'lyric': lyric,
'tlyric': tlyric,
'rlyric': rlyric,
'lxlyric': lxlyric
}
def process_lyric_match(self, match, rlyric, tlyric, i):
result = re.match(r'\[((\d+),\d+)\].*', match.group(0))
time = int(result.group(2))
ms = time % 1000
time /= 1000
m = str(int(time / 60)).zfill(2)
time %= 60
s = str(int(time)).zfill(2)
time_string = f'{m}:{s}.{ms}'
transformed_t = ''
if (tlyric):
for t in tlyric[i]:
transformed_t += t
tlyric[i] = transformed_t
if (rlyric):
nr = []
for r in rlyric[i]:
nr.append(r)
_tnr = ''.join(nr)
if (' ' in _tnr):
rlyric[i] = _tnr
else:
nr = []
for r in rlyric[i]:
nr.append(r.strip())
rlyric[i] = ' '.join(nr)
if rlyric:
rlyric[i] = f'[{time_string}]{rlyric[i] if rlyric[i] else ""}'.replace(' ', ' ')
if tlyric:
tlyric[i] = f'[{time_string}]{tlyric[i] if tlyric[i] else ""}'
self.i += 1
return re.sub(result.group(1), time_string, match.group(0))
global_parser = ParseTools()
def krcDecode(a:bytes):
encrypt_key = (64, 71, 97, 119, 94, 50, 116, 71, 81, 54, 49, 45, 206, 210, 110, 105)
content = a[4:] # krc1
compress_content = bytes(content[i] ^ encrypt_key[i % len(encrypt_key)] for i in range(len(content)))
text_bytes = zlib.decompress(bytes(compress_content))
text = text_bytes.decode("utf-8")
return text
async def lyricSearchByHash(hash_):
musicInfo = await getMusicInfo(hash_)
if (not musicInfo):
raise FailedException('歌曲信息获取失败')
hash_new = musicInfo['audio_info']['hash']
name = musicInfo['songname']
timelength = int(musicInfo['audio_info']['timelength']) // 1000
req = await Httpx.AsyncRequest(encodeURI(f'https://lyrics.kugou.com/search?ver=1&man=yes&client=pc&keyword=' +
name + '&hash=' + hash_new + '&timelength=' + str(timelength)), {
'method': 'GET',
})
body = req.json()
if (body['status'] != 200):
raise FailedException('歌词获取失败')
if (not body['candidates']):
raise FailedException('歌词获取失败: 当前歌曲无歌词')
return body['candidates']
async def getLyric(lyric_id, accesskey):
req = await Httpx.AsyncRequest(f'https://lyrics.kugou.com/download?ver=1&client=pc&id={lyric_id}&accesskey={accesskey}', {
'method': 'GET',
})
body = req.json()
if (body['status'] != 200 or body['error_code'] != 0 or (not body['content'])):
raise FailedException('歌词获取失败')
content = createBase64Decode(body['content'])
content = krcDecode(content)
return global_parser.parse(content)

View File

@ -7,13 +7,10 @@
# ---------------------------------------- # ----------------------------------------
# This file is part of the "lx-music-api-server" project. # This file is part of the "lx-music-api-server" project.
from .player import url
from .musicInfo import getMusicInfo as _getInfo from .musicInfo import getMusicInfo as _getInfo
from .utils import formatSinger from .utils import formatSinger
from .lyric import getLyric as _getLyric from .lyric import getLyric as _getLyric
from common import utils from common import utils
from . import refresh_login
async def info(songid): async def info(songid):
req = await _getInfo(songid) req = await _getInfo(songid)