feat: 支持tx/kg源歌曲信息获取

This commit is contained in:
helloplhm-qwq 2023-12-16 18:36:27 +08:00
parent 34eb8403b5
commit c01d1ed9fb
No known key found for this signature in database
GPG Key ID: 6BE1B64B905567C7
11 changed files with 571 additions and 277 deletions

View File

@ -132,5 +132,26 @@ def dump_xml(data):
def load_xml(data):
return xmltodict.parse(data)
def sizeFormat(size):
if size < 1024:
return f"{size}B"
elif size < 1024**2:
return f"{round(size / 1024, 2)}KB"
elif size < 1024**3:
return f"{round(size / 1024**2, 2)}MB"
elif size < 1024**4:
return f"{round(size / 1024**3, 2)}GB"
elif size < 1024**5:
return f"{round(size / 1024**4, 2)}TB"
else:
return f"{round(size / 1024**5, 2)}PB"
def timeLengthFormat(t):
t = int(t)
hour = t // 3600
minute = (t % 3600) // 60
second = t % 60
return f"{((('0' + str(hour)) if (len(str(hour)) == 1) else str(hour)) + ':') if (hour > 0) else ''}{minute:02}:{second:02}"
addToGlobalNamespace('require', require)

View File

@ -14,11 +14,11 @@ from common import config
from common import lxsecurity
from common import log
from common import Httpx
from modules import handleApiRequest
from aiohttp.web import Response
import ujson as json
import threading
import traceback
import modules
import time
def handleResult(dic, status = 200):
@ -91,8 +91,8 @@ async def handle(request):
return handleResult({"code": 1, "msg": "lxm请求头验证失败", "data": None}, 403)
try:
return handleResult(await handleApiRequest(method, source, songId, quality))
except Exception as e:
return handleResult(await getattr(modules, method)(source, songId, quality))
except:
logger.error(traceback.format_exc())
return handleResult({'code': 4, 'msg': '内部服务器错误', 'data': None}, 500)
@ -105,6 +105,7 @@ app.router.add_get('/', main)
# api
app.router.add_get('/{method}/{source}/{songId}/{quality}', handle)
app.router.add_get('/{method}/{source}/{songId}', handle)
# 404
app.router.add_route('*', '/{tail:.*}', handle_404)

View File

@ -48,7 +48,14 @@ sourceExpirationTime = {
}
async def handleApiRequest(command, source, songId, quality):
async def url(source, songId, quality):
if (not quality):
return {
'code': 2,
'msg': '需要参数"quality"',
'data': None,
}
try:
cache = config.getCache('urls', f'{source}_{songId}_{quality}')
if cache:
@ -68,11 +75,11 @@ async def handleApiRequest(command, source, songId, quality):
except:
logger.error(traceback.format_exc())
try:
func = require('modules.' + source + '.' + command)
func = require('modules.' + source + '.url')
except:
return {
'code': 1,
'msg': '未知的源或命令',
'msg': '未知的源或不支持的方法',
'data': None,
}
try:
@ -110,3 +117,26 @@ async def handleApiRequest(command, source, songId, quality):
'msg': e.args[0],
'data': None,
}
async def info(source, songid, _):
try:
func = require('modules.' + source + '.info')
except:
return {
'code': 1,
'msg': '未知的源或不支持的方法',
'data': None,
}
try:
result = await func(songid)
return {
'code': 0,
'msg': 'success',
'data': result
}
except FailedException as e:
return {
'code': 2,
'msg': e.args[0],
'data': None,
}

View File

@ -7,161 +7,55 @@
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from .musicInfo import getMusicSingerInfo as _getInfo2
from .musicInfo import getMusicInfo as _getInfo
from .utils import tools
from .player import url
from common.exceptions import FailedException
from common import utils
from common import config
from common import Httpx
import ujson as json
import time
from common import utils
import asyncio
createObject = utils.CreateObject
def buildSignatureParams(dictionary, body = ""):
joined_str = ''.join([f'{k}={v}' for k, v in dictionary.items()])
return joined_str + body
def buildRequestParams(dictionary):
joined_str = '&'.join([f'{k}={v}' for k, v in dictionary.items()])
return joined_str
tools = createObject({
"signkey": config.read_config("module.kg.client.signatureKey"),
"pidversec": config.read_config("module.kg.client.pidversionsecret"),
"clientver": config.read_config("module.kg.client.clientver"),
"x-router": config.read_config("module.kg.tracker.x-router"),
"url": config.read_config("module.kg.tracker.host") + config.read_config("module.kg.tracker.path"),
"version": config.read_config("module.kg.tracker.version"),
"userid": config.read_config("module.kg.user.userid"),
"token": config.read_config("module.kg.user.token"),
"mid": config.read_config("module.kg.user.mid"),
"extra_params": config.read_config("module.kg.tracker.extra_params"),
"appid": config.read_config("module.kg.client.appid"),
'qualityHashMap': {
'128k': 'hash_128',
'320k': 'hash_320',
'flac': 'hash_flac',
'flac24bit': 'hash_high',
'master': 'hash_128',
},
'qualityMap': {
'128k': '128',
'320k': '320',
'flac': 'flac',
'flac24bit': 'high',
'master': 'viper_atmos',
},
})
def sign(params, body = ""):
params = utils.sortDict(params)
params = buildSignatureParams(params, body)
return utils.createMD5(tools["signkey"] + params + tools["signkey"])
def signRequest(url, params, options):
params = utils.mergeDict(tools["extra_params"], params)
url = url + "?" + buildRequestParams(params) + "&signature=" + sign(params, options.get("body") if options.get("body") else (options.get("data") if options.get("data") else ""))
return Httpx.request(url, options)
def getKey(hash_):
return utils.createMD5(hash_.lower() + tools.pidversec + tools.appid + tools.mid + tools.userid)
async def getMusicInfo(hash_):
tn = int(time.time())
url = "http://gateway.kugou.com/v3/album_audio/audio"
options = {
"method": "POST",
"headers": {
"KG-THash": "13a3164",
"KG-RC": "1",
"KG-Fake": "0",
"KG-RF": "00869891",
"User-Agent": "Android712-AndroidPhone-11451-376-0-FeeCacheUpdate-wifi",
"x-router": "kmr.service.kugou.com",
},
"data": {
"area_code": "1",
"show_privilege": "1",
"show_album_info": "1",
"is_publish": "",
"appid": 1005,
"clientver": 11451,
"mid": tools.mid,
"dfid": "-",
"clienttime": tn,
"key": 'OIwlieks28dk2k092lksi2UIkp',
"fields": "audio_info,album_info,album_audio_id",
"data": [
{
"hash": hash_
}
]
},
'cache': 86400 * 15,
'cache-ignore': [tn]
}
options['body'] = json.dumps(options['data']).replace(', ', ',').replace(': ', ':')
return Httpx.request(url, dict(options)).json()['data'][0][0]
async def url(songId, quality):
songId = songId.lower()
body_ = await getMusicInfo(songId)
thash = body_['audio_info'][tools.qualityHashMap[quality]]
albumid = body_['album_info']['album_id'] if (body_.get('album_info') and body_['album_info'].get('album_id')) else None
albumaudioid = body_['album_audio_id'] if (body_.get('album_audio_id')) else None
if (not thash):
raise FailedException('获取歌曲信息失败')
if (not albumid):
albumid = ""
if (not albumaudioid):
albumaudioid = ""
thash = thash.lower()
params = {
'album_id': albumid,
'userid': tools.userid,
'area_code': 1,
'hash': thash,
'module': '',
'mid': tools.mid,
'appid': tools.appid,
'ssa_flag': 'is_fromtrack',
'clientver': tools.clientver,
'open_time': time.strftime("%Y%m%d"),
'vipType': 6,
'ptype': 0,
'token': tools.token,
'auth': '',
'mtype': 0,
'album_audio_id': albumaudioid,
'behavior': 'play',
'clienttime': int(time.time()),
'pid': 2,
'key': getKey(thash),
'dfid': '-',
'pidversion': 3001
}
if (tools.version == 'v5'):
params['quality'] = tools.qualityMap[quality]
if (tools.version == "v4"):
params['version'] = tools.clientver
headers = createObject({
'User-Agent': 'Android712-AndroidPhone-8983-18-0-NetMusic-wifi',
'KG-THash': '3e5ec6b',
'KG-Rec': '1',
'KG-RC': '1',
})
if (tools['x-router']['enable']):
headers['x-router'] = tools['x-router']['value']
req = signRequest(tools.url, params, {'headers': headers})
body = createObject(req.json())
if body.status == 3:
raise FailedException('该歌曲在酷狗没有版权,请换源播放')
elif body.status == 2:
raise FailedException('链接获取失败,请检查账号是否有会员或数字专辑是否购买')
elif body.status != 1:
raise FailedException('链接获取失败可能是数字专辑或者api失效')
async def info(hash_):
tasks = []
tasks.append(_getInfo(hash_))
tasks.append(_getInfo2(hash_))
tasks.append(Httpx.request('http://mobilecdnbj.kugou.com/api/v3/song/info?hash=' + hash_, {
'method': 'GET'
}))
res = await asyncio.gather(*tasks)
res1 = res[0]
res2 = res[1]
mvhash = res[2].json()['data']['mvhash'] if (res[2].json()['data']) else ''
file_info = {}
for k, v in tools['qualityHashMap'].items():
if (res1['audio_info'][v] and k != 'master'):
file_info[k] = {
'hash': res1['audio_info'][v],
'size': utils.sizeFormat(int(res1['audio_info'][v.replace('hash', 'filesize')])),
}
if (isinstance(res1, type(None))):
raise FailedException('获取歌曲信息失败,请检查歌曲是否存在')
return {
'url': body.url[0],
'quality': quality
}
'name': res1['songname'],
'name_ori': res1['ori_audio_name'],
'name_extra': res1['songname'].replace(res1['ori_audio_name'], '').strip(),
'singer': res1['author_name'],
'singer_list': res2,
'format_length': utils.timeLengthFormat(int(res1['audio_info']['timelength']) / 1000),
'length': int(res1['audio_info']['timelength']) / 1000,
'hash': res1['audio_info']['hash'],
'file_info': file_info,
'songmid': res1['audio_id'],
'album_id': res1['album_info']['album_id'],
'album': res1['album_info']['album_name'],
'bpm': res1['bpm'],
'language': res1['language'],
'cover': res1['album_info']['sizable_cover'].format(size = 1080),
'sizable_cover': res1['album_info']['sizable_cover'],
'publish_date': res1['publish_date'],
'mvid': mvhash,
'genre': []
}

97
modules/kg/musicInfo.py Normal file
View File

@ -0,0 +1,97 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: musicInfo.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.utils import createMD5
from common import Httpx
from .utils import tools, signRequest
import random
import ujson as json
import time
async def getMusicInfo(hash_, use_cache = True):
tn = int(time.time())
url = "http://gateway.kugou.com/v3/album_audio/audio"
options = {
"method": "POST",
"headers": {
"KG-THash": "13a3164",
"KG-RC": "1",
"KG-Fake": "0",
"KG-RF": "00869891",
"User-Agent": "Android712-AndroidPhone-11451-376-0-FeeCacheUpdate-wifi",
"x-router": "kmr.service.kugou.com",
},
"data": {
"area_code": "1",
"show_privilege": "1",
"show_album_info": "1",
"is_publish": "",
"appid": 1005,
"clientver": 11451,
"mid": tools.mid,
"dfid": "-",
"clienttime": tn,
"key": 'OIlwlieks28dk2k092lksi2UIkp',
"fields": "",
"data": [
{
"hash": hash_
}
]
},
'cache': 86400 * 30 if use_cache else 'no-cache',
'cache-ignore': [tn]
}
options['body'] = json.dumps(options['data']).replace(', ', ',').replace(': ', ':')
body = Httpx.request(url, dict(options)).json()
return body['data'][0][0] if (body['data'] and body['data'][0]) else {}
async def getMusicSingerInfo(hash_, use_cache = True):
# https://expendablekmrcdn.kugou.com/container/v2/image?album_image_type=-3&appid=1005&author_image_type=4%2C5&clientver=12029&count=5&data=%5B%7B%22mixSongId%22%3A452960726%2C%22album_id%22%3A62936873%2C%22hash%22%3A%2241f45664e8235b786990cbf213cd4725%22%2C%22filename%22%3A%22%E8%A2%81%E5%B0%8F%E8%91%B3%E3%80%81%E9%98%BF%E8%BE%B0%EF%BC%88%E9%98%8E%E8%BE%B0%EF%BC%89%20-%20%E5%8C%96%E4%BD%9C%E7%83%9F%E7%81%AB%E4%B8%BA%E4%BD%A0%E5%9D%A0%E8%90%BD%22%2C%22album_audio_id%22%3A452960726%7D%5D&isCdn=1&publish_time=1&signature=b6670b9d81ca1a4e52e186c4db74c7f2
url = "https://expendablekmrcdn.kugou.com/container/v2/image"
params = {
"album_image_type": -3,
"appid": 1005,
"author_image_type": "4,5",
"clientver": 12029,
"count": 5,
"data": json.dumps([
{
"hash": hash_.lower()
}
]),
"isCdn": 1,
"publish_time": 1
}
uuid = createMD5(str(random.randint(100000, 999999)) + '114514')
req = await signRequest(url, params, {
'method': 'GET',
'headers': {
'User-Agent': 'Android712-AndroidPhone-11451-18-0-Avatar-wifi',
'KG-THash': '2a2624f',
'KG-RC': '1',
'KG-Fake': '0',
'KG-RF': '0074c2c4',
'appid': '1005',
'clientver': '11451',
'uuid': uuid,
},
'cache': 86400 * 30 if use_cache else 'no-cache',
'cache-ignore': [uuid]
}, 'OIlwieks28dk2k092lksi2UIkp')
authors = req.json()['data'][0]['author']
res = []
for a in authors:
res.append({
'name': a['author_name'],
'id': a['author_id'],
'avatar': a['sizable_avatar'].format(size = 1080),
'sizable_avatar': a['sizable_avatar'],
})
return res

78
modules/kg/player.py Normal file
View File

@ -0,0 +1,78 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: player.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from common import utils
from .utils import getKey, signRequest, tools
from .musicInfo import getMusicInfo
import time
async def url(songId, quality):
songId = songId.lower()
body_ = await getMusicInfo(songId)
thash = body_['audio_info'][tools.qualityHashMap[quality]]
albumid = body_['album_info']['album_id'] if (body_.get('album_info') and body_['album_info'].get('album_id')) else None
albumaudioid = body_['album_audio_id'] if (body_.get('album_audio_id')) else None
if (not thash):
raise FailedException('获取歌曲信息失败')
if (not albumid):
albumid = ""
if (not albumaudioid):
albumaudioid = ""
thash = thash.lower()
params = {
'album_id': albumid,
'userid': tools.userid,
'area_code': 1,
'hash': thash,
'module': '',
'mid': tools.mid,
'appid': tools.appid,
'ssa_flag': 'is_fromtrack',
'clientver': tools.clientver,
'open_time': time.strftime("%Y%m%d"),
'vipType': 6,
'ptype': 0,
'token': tools.token,
'auth': '',
'mtype': 0,
'album_audio_id': albumaudioid,
'behavior': 'play',
'clienttime': int(time.time()),
'pid': 2,
'key': getKey(thash),
'dfid': '-',
'pidversion': 3001
}
if (tools.version == 'v5'):
params['quality'] = tools.qualityMap[quality]
if (tools.version == "v4"):
params['version'] = tools.clientver
params = utils.mergeDict(tools["extra_params"], params)
headers = {
'User-Agent': 'Android712-AndroidPhone-8983-18-0-NetMusic-wifi',
'KG-THash': '3e5ec6b',
'KG-Rec': '1',
'KG-RC': '1',
}
if (tools['x-router']['enable']):
headers['x-router'] = tools['x-router']['value']
req = await signRequest(tools.url, params, {'headers': headers})
body = req.json()
if body['status'] == 3:
raise FailedException('该歌曲在酷狗没有版权,请换源播放')
elif body['status'] == 2:
raise FailedException('链接获取失败,请检查账号是否有会员或数字专辑是否购买')
elif body['status'] != 1:
raise FailedException('链接获取失败可能是数字专辑或者api失效')
return {
'url': body.url[0],
'quality': quality
}

64
modules/kg/utils.py Normal file
View File

@ -0,0 +1,64 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: utils.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common import utils
from common import config
from common import Httpx
createObject = utils.CreateObject
tools = createObject({
"signkey": config.read_config("module.kg.client.signatureKey"),
"pidversec": config.read_config("module.kg.client.pidversionsecret"),
"clientver": config.read_config("module.kg.client.clientver"),
"x-router": config.read_config("module.kg.tracker.x-router"),
"url": config.read_config("module.kg.tracker.host") + config.read_config("module.kg.tracker.path"),
"version": config.read_config("module.kg.tracker.version"),
"userid": config.read_config("module.kg.user.userid"),
"token": config.read_config("module.kg.user.token"),
"mid": config.read_config("module.kg.user.mid"),
"extra_params": config.read_config("module.kg.tracker.extra_params"),
"appid": config.read_config("module.kg.client.appid"),
'qualityHashMap': {
'128k': 'hash_128',
'320k': 'hash_320',
'flac': 'hash_flac',
'flac24bit': 'hash_high',
'master': 'hash_128',
},
'qualityMap': {
'128k': '128',
'320k': '320',
'flac': 'flac',
'flac24bit': 'high',
'master': 'viper_atmos',
},
})
def buildSignatureParams(dictionary, body = ""):
joined_str = ''.join([f'{k}={v}' for k, v in dictionary.items()])
return joined_str + body
def buildRequestParams(dictionary):
joined_str = '&'.join([f'{k}={v}' for k, v in dictionary.items()])
return joined_str
def sign(params, body = "", signkey = tools["signkey"]):
params = utils.sortDict(params)
params = buildSignatureParams(params, body)
return utils.createMD5(signkey + params + signkey)
async def signRequest(url, params, options, signkey = tools["signkey"]):
params['signature'] = sign(params, options.get("body") if options.get("body") else (options.get("data") if options.get("data") else ""), signkey)
url = url + "?" + buildRequestParams(params)
return Httpx.request(url, options)
def getKey(hash_):
return utils.createMD5(hash_.lower() + tools.pidversec + tools.appid + tools.mid + tools.userid)

View File

@ -7,128 +7,71 @@
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from common import Httpx
from .player import url
from .musicInfo import getMusicInfo as _getInfo
from .utils import formatSinger
from common import utils
from common import config
from .QMWSign import sign
import ujson as json
createObject = utils.CreateObject
tools = createObject({
"fileInfo": {
"128k": {
'e': '.mp3',
'h': 'M500',
},
'320k': {
"e": '.mp3',
'h': 'M800',
},
'flac': {
"e": '.flac',
'h': 'F000',
},
'flac24bit': {
"e": '.flac',
'h': 'RS01',
},
"dolby": {
"e": ".flac",
"h": "Q000",
},
"master": {
"e": ".flac",
"h": "AI00",
async def info(songid):
req = await _getInfo(songid)
singerList = []
for s in req['track_info']['singer']:
s.pop('uin')
s.pop('title')
singerList.append(s)
file_info = {}
if (req['track_info']['file']['size_128mp3'] != 0):
file_info['128k'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_128mp3'])),
}
},
'qualityMapReverse': {
'M500': '128k',
'M800': '320k',
'F000': 'flac',
'RS01': 'flac24bit',
'Q000': 'dolby',
'AI00': 'master'
},
"key": config.read_config("module.tx.user.qqmusic_key"),
"loginuin": config.read_config("module.tx.user.uin"),
"guid": config.read_config("module.tx.vkeyserver.guid"),
"uin": config.read_config("module.tx.vkeyserver.uin"),
"cdnaddr": config.read_config("module.tx.cdnaddr") if config.read_config("module.tx.cdnaddr") else 'http://ws.stream.qqmusic.qq.com/',
})
def signRequest(data, cache = False):
data = json.dumps(data)
s = sign(data)
headers = {}
return Httpx.request('https://u.y.qq.com/cgi-bin/musics.fcg?format=json&sign=' + s, {
'method': 'POST',
'body': data,
'headers': headers,
"cache": (86400 * 30) if cache else "no-cache"
})
async def url(songId, quality):
infoReqBody = {
"comm": {
"ct": '19',
"cv": '1859',
"uin": '0',
},
"req": {
"module": 'music.pf_song_detail_svr',
"method": 'get_song_detail_yqq',
"param": {
"song_type": 0,
"song_mid": songId,
},
},
}
infoRequest = signRequest(infoReqBody, True)
infoBody = createObject(infoRequest.json())
if (infoBody.code != 0 or infoBody.req.code != 0):
raise FailedException("获取音乐信息失败")
strMediaMid = infoBody.req.data.track_info.file.media_mid
requestBody = {
'req_0': {
'module': 'vkey.GetVkeyServer',
'method': 'CgiGetVkey',
'param': {
'filename': [f"{tools.fileInfo[quality]['h']}{strMediaMid}{tools.fileInfo[quality]['e']}"],
'guid': tools.guid,
'songmid': [songId],
'songtype': [0],
'uin': tools.uin,
'loginflag': 1,
'platform': '20',
},
},
'comm': {
"qq": tools.loginuin,
"authst": tools.key,
"ct": "26",
"cv": "2010101",
"v": "2010101"
},
}
req = signRequest(requestBody)
body = createObject(req.json())
data = body.req_0.data.midurlinfo[0]
url = data['purl']
if (not url):
raise FailedException('failed')
try:
resultQuality = data['filename'].split('.')[0].replace(data['songmid'], '')
except:
resultQuality = None
if (req['track_info']['file']['size_320mp3'] != 0):
file_info['320k'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_320mp3'])),
}
if (req['track_info']['file']['size_flac'] != 0):
file_info['flac'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_flac'])),
}
if (req['track_info']['file']['size_hires'] != 0):
file_info['flac24bit'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_hires'])),
}
if (req['track_info']['file']['size_dolby'] != 0):
file_info['dolby'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_dolby'])),
}
if (req['track_info']['file']['size_new'][0] != 0):
file_info['master'] = {
'size': utils.sizeFormat(int(req['track_info']['file']['size_new'][0])),
}
genres = []
for g in req['info']['genre']['content']:
genres.append(g['value'])
return {
'url': tools.cdnaddr + url,
'quality': tools.qualityMapReverse[resultQuality]
'name': req['track_info']['title'] + ' ' + req['track_info']['subtitle'].strip(),
'name_ori': req['track_info']['title'],
'name_extra': req['track_info']['subtitle'].strip(),
'singer': formatSinger(req['track_info']['singer']),
'singer_list': singerList,
'format_length': utils.timeLengthFormat(int(req['track_info']['interval'])),
'length': int(req['track_info']['interval']),
'media_mid': req['track_info']['file']['media_mid'],
'file_info': file_info,
'songmid': req['track_info']['mid'],
'album_id': req['track_info']['album']['id'],
'album_mid': req['track_info']['album']['mid'],
'album': req['track_info']['album']['title'] + ' ' + req['track_info']['album']['subtitle'].strip(),
'language': req['info']['lan']['content'][0]['value'],
'cover': f'https://y.qq.com/music/photo_new/T002R800x800M000{req["track_info"]["album"]["pmid"]}.jpg',
'sizable_cover': 'https://y.qq.com/music/photo_new/T002R{size}x{size}M000' + f'{req["track_info"]["album"]["pmid"]}.jpg',
'publish_date': req['track_info']['time_public'],
'mvid': req['track_info']['mv']['id'],
'genre': genres,
'kmid': req['track_info']['ksong']['mid'],
'kid': req['track_info']['ksong']['id'],
'bpm': req['track_info']['bpm'],
}

34
modules/tx/musicInfo.py Normal file
View File

@ -0,0 +1,34 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: musicInfo.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from .utils import signRequest
async def getMusicInfo(songid):
infoReqBody = {
"comm": {
"ct": '19',
"cv": '1859',
"uin": '0',
},
"req": {
"module": 'music.pf_song_detail_svr',
"method": 'get_song_detail_yqq',
"param": {
"song_type": 0,
"song_mid": songid,
},
},
}
infoRequest = await signRequest(infoReqBody, True)
infoBody = infoRequest.json()
if (infoBody['code'] != 0 or infoBody['req']['code'] != 0):
raise FailedException("获取音乐信息失败")
return infoBody['req']['data']

56
modules/tx/player.py Normal file
View File

@ -0,0 +1,56 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: player.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from common import utils
from .musicInfo import getMusicInfo
from .utils import tools
from .utils import signRequest
createObject = utils.CreateObject
async def url(songId, quality):
infoBody = await getMusicInfo(songId)
strMediaMid = infoBody['track_info']['file']['media_mid']
requestBody = {
'req_0': {
'module': 'vkey.GetVkeyServer',
'method': 'CgiGetVkey',
'param': {
'filename': [f"{tools.fileInfo[quality]['h']}{strMediaMid}{tools.fileInfo[quality]['e']}"],
'guid': tools.guid,
'songmid': [songId],
'songtype': [0],
'uin': tools.uin,
'loginflag': 1,
'platform': '20',
},
},
'comm': {
"qq": tools.loginuin,
"authst": tools.key,
"ct": "26",
"cv": "2010101",
"v": "2010101"
},
}
req = await signRequest(requestBody)
body = createObject(req.json())
data = body.req_0.data.midurlinfo[0]
url = data['purl']
if (not url):
raise FailedException('failed')
resultQuality = data['filename'].split('.')[0][:4]
return {
'url': tools.cdnaddr + url,
'quality': tools.qualityMapReverse[resultQuality]
}

76
modules/tx/utils.py Normal file
View File

@ -0,0 +1,76 @@
# ----------------------------------------
# - mode: python -
# - author: helloplhm-qwq -
# - name: utils.py -
# - project: lx-music-api-server -
# - license: MIT -
# ----------------------------------------
# This file is part of the "lx-music-api-server" project.
from common.exceptions import FailedException
from common import Httpx
from common import utils
from common import config
from .QMWSign import sign
import ujson as json
createObject = utils.CreateObject
tools = createObject({
"fileInfo": {
"128k": {
'e': '.mp3',
'h': 'M500',
},
'320k': {
"e": '.mp3',
'h': 'M800',
},
'flac': {
"e": '.flac',
'h': 'F000',
},
'flac24bit': {
"e": '.flac',
'h': 'RS01',
},
"dolby": {
"e": ".flac",
"h": "Q000",
},
"master": {
"e": ".flac",
"h": "AI00",
}
},
'qualityMapReverse': {
'M500': '128k',
'M800': '320k',
'F000': 'flac',
'RS01': 'flac24bit',
'Q000': 'dolby',
'AI00': 'master'
},
"key": config.read_config("module.tx.user.qqmusic_key"),
"loginuin": config.read_config("module.tx.user.uin"),
"guid": config.read_config("module.tx.vkeyserver.guid"),
"uin": config.read_config("module.tx.vkeyserver.uin"),
"cdnaddr": config.read_config("module.tx.cdnaddr") if config.read_config("module.tx.cdnaddr") else 'http://ws.stream.qqmusic.qq.com/',
})
async def signRequest(data, cache = False):
data = json.dumps(data)
s = sign(data)
headers = {}
return Httpx.request('https://u.y.qq.com/cgi-bin/musics.fcg?format=json&sign=' + s, {
'method': 'POST',
'body': data,
'headers': headers,
"cache": (86400 * 30) if cache else "no-cache"
})
def formatSinger(singerList):
n = []
for s in singerList:
n.append(s['name'])
return ''.join(n)