mirror of
https://github.com/MeoProject/lx-music-api-server.git
synced 2025-07-06 22:42:14 +08:00
chore: 优化代码
This commit is contained in:
@ -12,23 +12,23 @@ from Crypto.Cipher import AES, DES
|
||||
import binascii
|
||||
import base64
|
||||
|
||||
def AESEncrypt(plainText, key, iv):
|
||||
def createAesEncrypt(plainText, key, iv):
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
if isinstance(plainText, str):
|
||||
plainText = plainText.encode('utf-8')
|
||||
return cipher.encrypt(pad(plainText))
|
||||
|
||||
def AESDecrypt(cipherText, key, iv):
|
||||
def createAesDecrypt(cipherText, key, iv):
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
return unpad(cipher.decrypt(cipherText))
|
||||
|
||||
def hexAESDecrypt(cipherText, key, iv):
|
||||
def createAesEncryptByHex(cipherText, key, iv):
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
if isinstance(cipherText, str):
|
||||
cipherText = cipherText.encode('utf-8')
|
||||
return unpad(cipher.decrypt(binascii.unhexlify(cipherText)))
|
||||
|
||||
def base64AESDecrypt(cipherText, key, iv):
|
||||
def createAesEncryptByBase64(cipherText, key, iv):
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
if isinstance(cipherText, str):
|
||||
cipherText = cipherText.encode('utf-8')
|
||||
|
@ -95,12 +95,12 @@ def request(url, options = {}):
|
||||
for i in options.get('cache-ignore'):
|
||||
cache_key = cache_key.replace(str(i), '')
|
||||
options.pop('cache-ignore')
|
||||
cache_key = utils.md5(cache_key)
|
||||
cache_key = utils.createMD5(cache_key)
|
||||
if options.get("cache") and options["cache"] != "no-cache":
|
||||
cache = config.getCache("httpx", cache_key)
|
||||
if cache:
|
||||
logger.debug(f"请求 {url} 有可用缓存")
|
||||
return pickle.loads(utils.from_base64(cache["data"]))
|
||||
return pickle.loads(utils.createBase64Decode(cache["data"]))
|
||||
if "cache" in list(options.keys()):
|
||||
cache_info = options.get("cache")
|
||||
options.pop("cache")
|
||||
@ -170,7 +170,7 @@ def request(url, options = {}):
|
||||
if (cache_info and cache_info != "no-cache"):
|
||||
cache_data = pickle.dumps(req)
|
||||
expire_time = (cache_info if isinstance(cache_info, int) else 3600) + int(time.time())
|
||||
config.updateCache("httpx", cache_key, {"expire": True, "time": expire_time, "data": utils.to_base64(cache_data)})
|
||||
config.updateCache("httpx", cache_key, {"expire": True, "time": expire_time, "data": utils.createBase64Encode(cache_data)})
|
||||
logger.debug("缓存已更新: " + url)
|
||||
# 返回请求
|
||||
return req
|
||||
@ -179,7 +179,7 @@ def request(url, options = {}):
|
||||
def checkcn():
|
||||
try:
|
||||
req = request("https://mips.kugou.com/check/iscn?&format=json")
|
||||
body = utils.jsobject(req.json())
|
||||
body = utils.CreateObject(req.json())
|
||||
variable.iscn = bool(body.flag)
|
||||
if (not variable.iscn):
|
||||
variable.fakeip = config.read_config('common.fakeip')
|
||||
@ -211,12 +211,12 @@ async def asyncrequest(url, options = {}):
|
||||
for i in options.get('cache-ignore'):
|
||||
cache_key = cache_key.replace(str(i), '')
|
||||
options.pop('cache-ignore')
|
||||
cache_key = utils.md5(cache_key)
|
||||
cache_key = utils.createMD5(cache_key)
|
||||
if options.get("cache") and options["cache"] != "no-cache":
|
||||
cache = config.getCache("httpx", cache_key)
|
||||
if cache:
|
||||
logger.debug(f"请求 {url} 有可用缓存")
|
||||
return pickle.loads(utils.from_base64(cache["data"]))
|
||||
return pickle.loads(utils.createBase64Decode(cache["data"]))
|
||||
if "cache" in list(options.keys()):
|
||||
cache_info = options.get("cache")
|
||||
options.pop("cache")
|
||||
|
@ -14,7 +14,7 @@ import os
|
||||
from pygments import highlight
|
||||
from pygments.lexers import PythonLexer
|
||||
from pygments.formatters import TerminalFormatter
|
||||
from .utils import sanitize_filename, add_to_global_namespace
|
||||
from .utils import filterFileName, addToGlobalNamespace
|
||||
from .variable import debug_mode, log_length_limit
|
||||
|
||||
if not os.path.exists("logs"):
|
||||
@ -66,7 +66,7 @@ class log:
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
if filename:
|
||||
filename = sanitize_filename(filename)
|
||||
filename = filterFileName(filename)
|
||||
else:
|
||||
filename = './logs/' + module_name + '.log'
|
||||
file_handler = logging.FileHandler(filename, encoding = "utf-8")
|
||||
@ -159,4 +159,4 @@ printlogger = log('print')
|
||||
def logprint(*args, sep = ' ', end = '', file = None, flush = None):
|
||||
printlogger.info(sep.join(str(arg) for arg in args), allow_hidden = False)
|
||||
|
||||
add_to_global_namespace('print', logprint)
|
||||
addToGlobalNamespace('print', logprint)
|
||||
|
@ -21,9 +21,8 @@ def checklxmheader(lxm, url):
|
||||
|
||||
cop, version = tuple(lxm.split('&'))
|
||||
version = (3 - len(version) % 3) * '0' + version
|
||||
cop = utils.inflate_raw_sync(binascii.unhexlify(cop.encode('utf-8'))).decode('utf-8')
|
||||
cop = utils.from_base64(cop).decode('utf-8')
|
||||
# print(retvalue + version)
|
||||
cop = utils.handleInflateRawSync(binascii.unhexlify(cop.encode('utf-8'))).decode('utf-8')
|
||||
cop = utils.createBase64Decode(cop).decode('utf-8')
|
||||
arr, outsideversion = tuple([cop.split(']')[0] + ']', cop.split(']')[1]])
|
||||
arr = json.loads(arr)
|
||||
version = re.findall("\\d+", version)[0]
|
||||
|
@ -17,27 +17,27 @@ import re
|
||||
import ujson as json
|
||||
import xmltodict
|
||||
from urllib.parse import quote
|
||||
from hashlib import md5 as _md5
|
||||
from hashlib import md5 as handleCreateMD5
|
||||
from aiohttp.web import Response
|
||||
# from flask import Response
|
||||
|
||||
def to_base64(data_bytes):
|
||||
def createBase64Encode(data_bytes):
|
||||
encoded_data = base64.b64encode(data_bytes)
|
||||
return encoded_data.decode('utf-8')
|
||||
|
||||
def to_hex(data_bytes):
|
||||
def createHexEncode(data_bytes):
|
||||
hex_encoded = binascii.hexlify(data_bytes)
|
||||
return hex_encoded.decode('utf-8')
|
||||
|
||||
def from_base64(data):
|
||||
def createBase64Decode(data):
|
||||
decoded_data = base64.b64decode(data)
|
||||
return decoded_data
|
||||
|
||||
def from_hex(data):
|
||||
def createHexDecode(data):
|
||||
decoded_data = binascii.unhexlify(data.decode('utf-8'))
|
||||
return decoded_data
|
||||
|
||||
def inflate_raw_sync(data):
|
||||
def handleInflateRawSync(data):
|
||||
decompress_obj = zlib.decompressobj(-zlib.MAX_WBITS)
|
||||
decompressed_data = decompress_obj.decompress(data) + decompress_obj.flush()
|
||||
return decompressed_data
|
||||
@ -54,10 +54,10 @@ def require(module):
|
||||
index += 1
|
||||
return _module
|
||||
|
||||
def add_to_global_namespace(key, data):
|
||||
def addToGlobalNamespace(key, data):
|
||||
setattr(builtins, key, data)
|
||||
|
||||
def sanitize_filename(filename):
|
||||
def filterFileName(filename):
|
||||
if platform.system() == 'Windows' or platform.system() == 'Cygwin':
|
||||
# Windows不合法文件名字符
|
||||
illegal_chars = r'[<>:"/\\|?*\x00-\x1f]'
|
||||
@ -67,21 +67,19 @@ def sanitize_filename(filename):
|
||||
# 将不合法字符替换为下划线
|
||||
return re.sub(illegal_chars, '_', filename)
|
||||
|
||||
def md5(s: str):
|
||||
# 计算md5
|
||||
# print(s)
|
||||
return _md5(s.encode("utf-8")).hexdigest()
|
||||
def createMD5(s: str):
|
||||
return handleCreateMD5(s.encode("utf-8")).hexdigest()
|
||||
|
||||
def readfile(path, mode = "text"):
|
||||
def readFile(path, mode = "text"):
|
||||
try:
|
||||
fileObj = open(path, "rb")
|
||||
except FileNotFoundError:
|
||||
return "file not found"
|
||||
content = fileObj.read()
|
||||
if mode == "base64":
|
||||
return to_base64(content)
|
||||
return createBase64Encode(content)
|
||||
elif mode == "hex":
|
||||
return to_hex(content)
|
||||
return createHexEncode(content)
|
||||
elif mode == "text":
|
||||
return content.decode("utf-8")
|
||||
else:
|
||||
@ -92,29 +90,29 @@ def unique_list(list_in):
|
||||
[unique_list.append(x) for x in list_in if x not in unique_list]
|
||||
return unique_list
|
||||
|
||||
def handle_response(dic, status = 200):
|
||||
def handleResult(dic, status = 200):
|
||||
return Response(body = json.dumps(dic, indent=2, ensure_ascii=False), content_type='application/json', status = status)
|
||||
|
||||
def encodeURIComponent(component):
|
||||
return quote(component)
|
||||
|
||||
def sort_dict(dictionary):
|
||||
def sortDict(dictionary):
|
||||
sorted_items = sorted(dictionary.items())
|
||||
sorted_dict = {k: v for k, v in sorted_items}
|
||||
return sorted_dict
|
||||
|
||||
def merge_dict(dict1, dict2):
|
||||
def mergeDict(dict1, dict2):
|
||||
merged_dict = dict2.copy()
|
||||
merged_dict.update(dict1)
|
||||
return merged_dict
|
||||
|
||||
class jsobject(dict):
|
||||
class CreateObject(dict):
|
||||
def __init__(self, d):
|
||||
super().__init__(d)
|
||||
self._raw = d
|
||||
for key, value in d.items():
|
||||
if isinstance(value, dict):
|
||||
setattr(self, key, jsobject(value))
|
||||
setattr(self, key, CreateObject(value))
|
||||
else:
|
||||
setattr(self, key, value)
|
||||
|
||||
@ -126,7 +124,7 @@ class jsobject(dict):
|
||||
def to_dict(self):
|
||||
result = {}
|
||||
for key, value in self.items():
|
||||
if isinstance(value, jsobject):
|
||||
if isinstance(value, CreateObject):
|
||||
result[key] = value.to_dict()
|
||||
else:
|
||||
result[key] = value
|
||||
@ -141,5 +139,5 @@ def dump_xml(data):
|
||||
def load_xml(data):
|
||||
return xmltodict.parse(data)
|
||||
|
||||
add_to_global_namespace('require', require)
|
||||
addToGlobalNamespace('require', require)
|
||||
|
||||
|
@ -28,7 +28,7 @@ logger = log.log("main")
|
||||
from common import utils
|
||||
from common import lxsecurity
|
||||
from common import Httpx
|
||||
from apis import SongURL
|
||||
from modules import SongURL
|
||||
import traceback
|
||||
import time
|
||||
Httpx.checkcn()
|
||||
|
26
main.py
26
main.py
@ -15,7 +15,7 @@ from common import lxsecurity
|
||||
from common import utils
|
||||
from common import log
|
||||
from common import Httpx
|
||||
from apis import handle_api_request
|
||||
from modules import handleApiRequest
|
||||
import traceback
|
||||
import time
|
||||
|
||||
@ -32,20 +32,20 @@ async def handle_before_request(app, handler):
|
||||
request.remote = request.headers.get("X-Real-IP")
|
||||
# check ip
|
||||
if (config.check_ip_banned(request.remote)):
|
||||
return utils.handle_response({"code": 1, "msg": "您的IP已被封禁", "data": None}, 403)
|
||||
return utils.handleResult({"code": 1, "msg": "您的IP已被封禁", "data": None}, 403)
|
||||
# check global rate limit
|
||||
if (
|
||||
(time.time() - config.getRequestTime('global'))
|
||||
<
|
||||
(config.read_config("security.rate_limit.global"))
|
||||
):
|
||||
return utils.handle_response({"code": 5, "msg": "全局限速", "data": None}, 429)
|
||||
return utils.handleResult({"code": 5, "msg": "全局限速", "data": None}, 429)
|
||||
if (
|
||||
(time.time() - config.getRequestTime(request.remote))
|
||||
<
|
||||
(config.read_config("security.rate_limit.ip"))
|
||||
):
|
||||
return utils.handle_response({"code": 5, "msg": "IP限速", "data": None}, 429)
|
||||
return utils.handleResult({"code": 5, "msg": "IP限速", "data": None}, 429)
|
||||
# update request time
|
||||
config.updateRequestTime('global')
|
||||
config.updateRequestTime(request.remote)
|
||||
@ -54,21 +54,21 @@ async def handle_before_request(app, handler):
|
||||
if request.remote_host.split(":")[0] not in config.read_config("security.allowed_host.list"):
|
||||
if config.read_config("security.allowed_host.blacklist.enable"):
|
||||
config.ban_ip(request.remote, int(config.read_config("security.allowed_host.blacklist.length")))
|
||||
return utils.handle_response({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
return utils.handleResult({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
try:
|
||||
resp = await handler(request)
|
||||
aiologger.info(f'{request.remote} - {request.method} "{request.path}", {resp.status}')
|
||||
return resp
|
||||
except web.HTTPException as ex:
|
||||
if ex.status == 500: # 捕获500错误
|
||||
return utils.handle_response({"code": 4, "msg": "内部服务器错误", "data": None}, 500)
|
||||
return utils.handleResult({"code": 4, "msg": "内部服务器错误", "data": None}, 500)
|
||||
else:
|
||||
logger.error(traceback.format_exc())
|
||||
return utils.handle_response({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
return utils.handleResult({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
return handle_request
|
||||
|
||||
async def main(request):
|
||||
return utils.handle_response({"code": 0, "msg": "success", "data": None})
|
||||
return utils.handleResult({"code": 0, "msg": "success", "data": None})
|
||||
|
||||
|
||||
async def handle(request):
|
||||
@ -80,22 +80,22 @@ async def handle(request):
|
||||
if (request.headers.get("X-Request-Key")) != config.read_config("security.key.value"):
|
||||
if (config.read_config("security.key.ban")):
|
||||
config.ban_ip(request.remote)
|
||||
return utils.handle_response({"code": 1, "msg": "key验证失败", "data": None}, 403)
|
||||
return utils.handleResult({"code": 1, "msg": "key验证失败", "data": None}, 403)
|
||||
if (config.read_config('security.check_lxm.enable') and request.host.split(':')[0] not in config.read_config('security.whitelist_host')):
|
||||
lxm = request.headers.get('lxm')
|
||||
if (not lxsecurity.checklxmheader(lxm, request.url)):
|
||||
if (config.read_config('security.lxm_ban.enable')):
|
||||
config.ban_ip(request.remote)
|
||||
return utils.handle_response({"code": 1, "msg": "lxm请求头验证失败", "data": None}, 403)
|
||||
return utils.handleResult({"code": 1, "msg": "lxm请求头验证失败", "data": None}, 403)
|
||||
|
||||
try:
|
||||
return utils.handle_response(await handle_api_request(method, source, songId, quality))
|
||||
return utils.handleResult(await handleApiRequest(method, source, songId, quality))
|
||||
except Exception as e:
|
||||
logger.error(traceback.format_exc())
|
||||
return utils.handle_response({'code': 4, 'msg': '内部服务器错误', 'data': None}, 500)
|
||||
return utils.handleResult({'code': 4, 'msg': '内部服务器错误', 'data': None}, 500)
|
||||
|
||||
async def handle_404(request):
|
||||
return utils.handle_response({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
return utils.handleResult({'code': 6, 'msg': '未找到您所请求的资源', 'data': None}, 404)
|
||||
|
||||
app = web.Application(middlewares=[handle_before_request])
|
||||
# mainpage
|
||||
|
@ -48,7 +48,7 @@ sourceExpirationTime = {
|
||||
}
|
||||
|
||||
|
||||
async def handle_api_request(command, source, songId, quality):
|
||||
async def handleApiRequest(command, source, songId, quality):
|
||||
if (source == "kg"):
|
||||
songId = songId.lower()
|
||||
try:
|
||||
@ -63,7 +63,7 @@ async def handle_api_request(command, source, songId, quality):
|
||||
except:
|
||||
logger.error(traceback.format_exc())
|
||||
try:
|
||||
func = require('apis.' + source + '.' + command)
|
||||
func = require('modules.' + source + '.' + command)
|
||||
except:
|
||||
return {
|
||||
'code': 1,
|
@ -14,17 +14,17 @@ from common import Httpx
|
||||
import ujson as json
|
||||
import time
|
||||
|
||||
jsobject = utils.jsobject
|
||||
createObject = utils.CreateObject
|
||||
|
||||
def buildsignparams(dictionary, body = ""):
|
||||
def buildSignatureParams(dictionary, body = ""):
|
||||
joined_str = ''.join([f'{k}={v}' for k, v in dictionary.items()])
|
||||
return joined_str + body
|
||||
|
||||
def buildrequestparams(dictionary):
|
||||
def buildRequestParams(dictionary):
|
||||
joined_str = '&'.join([f'{k}={v}' for k, v in dictionary.items()])
|
||||
return joined_str
|
||||
|
||||
tools = jsobject({
|
||||
tools = createObject({
|
||||
"signkey": config.read_config("module.kg.client.signatureKey"),
|
||||
"pidversec": config.read_config("module.kg.client.pidversionsecret"),
|
||||
"clientver": config.read_config("module.kg.client.clientver"),
|
||||
@ -53,18 +53,18 @@ tools = jsobject({
|
||||
})
|
||||
|
||||
def sign(params, body = ""):
|
||||
params = utils.sort_dict(params)
|
||||
params = buildsignparams(params, body)
|
||||
return utils.md5(tools["signkey"] + params + tools["signkey"])
|
||||
params = utils.sortDict(params)
|
||||
params = buildSignatureParams(params, body)
|
||||
return utils.createMD5(tools["signkey"] + params + tools["signkey"])
|
||||
|
||||
def signRequest(url, params, options):
|
||||
params = utils.merge_dict(tools["extra_params"], params)
|
||||
url = url + "?" + buildrequestparams(params) + "&signature=" + sign(params, options.get("body") if options.get("body") else (options.get("data") if options.get("data") else ""))
|
||||
params = utils.mergeDict(tools["extra_params"], params)
|
||||
url = url + "?" + buildRequestParams(params) + "&signature=" + sign(params, options.get("body") if options.get("body") else (options.get("data") if options.get("data") else ""))
|
||||
return Httpx.request(url, options)
|
||||
|
||||
def getKey(hash_):
|
||||
# print(hash_ + tools.pidversec + tools.appid + tools.mid + tools.userid)
|
||||
return utils.md5(hash_.lower() + tools.pidversec + tools.appid + tools.mid + tools.userid)
|
||||
return utils.createMD5(hash_.lower() + tools.pidversec + tools.appid + tools.mid + tools.userid)
|
||||
|
||||
async def getMusicInfo(hash_):
|
||||
tn = int(time.time())
|
||||
@ -145,7 +145,7 @@ async def url(songId, quality):
|
||||
# print(params.quality)
|
||||
if (tools.version == "v4"):
|
||||
params['version'] = tools.clientver
|
||||
headers = jsobject({
|
||||
headers = createObject({
|
||||
'User-Agent': 'Android712-AndroidPhone-8983-18-0-NetMusic-wifi',
|
||||
'KG-THash': '3e5ec6b',
|
||||
'KG-Rec': '1',
|
||||
@ -154,7 +154,7 @@ async def url(songId, quality):
|
||||
if (tools['x-router']['enable']):
|
||||
headers['x-router'] = tools['x-router']['value']
|
||||
req = signRequest(tools.url, params, {'headers': headers})
|
||||
body = jsobject(req.json())
|
||||
body = createObject(req.json())
|
||||
|
||||
if body.status == 3:
|
||||
raise FailedException('该歌曲在酷狗没有版权,请换源播放')
|
@ -7,7 +7,7 @@
|
||||
# ----------------------------------------
|
||||
# This file is part of the "lx-music-api-server" project.
|
||||
|
||||
from common.utils import md5 as _md5
|
||||
from common.utils import createMD5
|
||||
import re as _re
|
||||
|
||||
def v(b):
|
||||
@ -92,7 +92,7 @@ def t(b):
|
||||
return res
|
||||
|
||||
def sign(params):
|
||||
md5Str = _md5(params).upper()
|
||||
md5Str = createMD5(params).upper()
|
||||
h = v(md5Str)
|
||||
e = c(md5Str)
|
||||
ls = t(md5Str)
|
@ -14,9 +14,9 @@ from common import config
|
||||
from .QMWSign import sign
|
||||
import ujson as json
|
||||
|
||||
jsobject = utils.jsobject
|
||||
createObject = utils.CreateObject
|
||||
|
||||
tools = jsobject({
|
||||
tools = createObject({
|
||||
"fileInfo": {
|
||||
"128k": {
|
||||
'e': '.mp3',
|
||||
@ -80,7 +80,7 @@ async def url(songId, quality):
|
||||
},
|
||||
}
|
||||
infoRequest = signRequest(infoReqBody, True)
|
||||
infoBody = jsobject(infoRequest.json())
|
||||
infoBody = createObject(infoRequest.json())
|
||||
if (infoBody.code != 0 or infoBody.req.code != 0):
|
||||
raise FailedException("获取音乐信息失败")
|
||||
strMediaMid = infoBody.req.data.track_info.file.media_mid
|
||||
@ -107,7 +107,7 @@ async def url(songId, quality):
|
||||
},
|
||||
}
|
||||
req = signRequest(requestBody)
|
||||
body = jsobject(req.json())
|
||||
body = CreateObject(req.json())
|
||||
# js const { purl } = data.req_0.data.midurlinfo[0]
|
||||
if (not body.req_0.data.midurlinfo[0]['purl']):
|
||||
raise FailedException('failed')
|
@ -13,8 +13,9 @@ from base64 import b64encode
|
||||
from binascii import hexlify
|
||||
from hashlib import md5
|
||||
from Crypto.Cipher import AES
|
||||
from common import utils
|
||||
|
||||
__all__ = ["weEncrypt", "linuxEncrypt", "eEncrypt", "MD5"]
|
||||
__all__ = ["weEncrypt", "linuxEncrypt", "eEncrypt"]
|
||||
|
||||
MODULUS = (
|
||||
"00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7"
|
||||
@ -28,13 +29,6 @@ NONCE = b"0CoJUm6Qyw8W8jud"
|
||||
LINUXKEY = b"rFgB&h#%2?^eDg:Q"
|
||||
EAPIKEY = b'e82ckenh8dichen8'
|
||||
|
||||
|
||||
def MD5(value):
|
||||
m = md5()
|
||||
m.update(value.encode())
|
||||
return m.hexdigest()
|
||||
|
||||
|
||||
def weEncrypt(text):
|
||||
"""
|
||||
引用自 https://github.com/darknessomi/musicbox/blob/master/NEMbox/encrypt.py#L40
|
||||
@ -46,7 +40,6 @@ def weEncrypt(text):
|
||||
encseckey = rsa(secret, PUBKEY, MODULUS)
|
||||
return {"params": params, "encSecKey": encseckey}
|
||||
|
||||
|
||||
def linuxEncrypt(text):
|
||||
"""
|
||||
参考自 https://github.com/Binaryify/NeteaseCloudMusicApi/blob/master/util/crypto.js#L28
|
||||
@ -55,14 +48,12 @@ def linuxEncrypt(text):
|
||||
data = aes(text, LINUXKEY)
|
||||
return {"eparams": data.decode()}
|
||||
|
||||
|
||||
def eapiEncrypt(url, text):
|
||||
text = str(text)
|
||||
digest = MD5("nobody{}use{}md5forencrypt".format(url, text))
|
||||
digest = utils.createMD5("nobody{}use{}md5forencrypt".format(url, text))
|
||||
data = "{}-36cd479b6b5-{}-36cd479b6b5-{}".format(url, text, digest)
|
||||
return {"params": aes(data.encode(), EAPIKEY).decode("utf-8")}
|
||||
|
||||
|
||||
def aes(text, key, method={}):
|
||||
pad = 16 - len(text) % 16
|
||||
text = text + bytearray([pad] * pad)
|
||||
@ -75,7 +66,6 @@ def aes(text, key, method={}):
|
||||
return b64encode(ciphertext)
|
||||
return hexlify(ciphertext).upper()
|
||||
|
||||
|
||||
def rsa(text, pubkey, modulus):
|
||||
text = text[::-1]
|
||||
rs = pow(int(hexlify(text), 16),
|
Reference in New Issue
Block a user