feat: 更新循环任务调度器,修复启动后服务器停不掉的问题

This commit is contained in:
helloplhm-qwq 2023-12-23 16:40:28 +08:00
parent d4056fcb7b
commit d9a13c5fe4
No known key found for this signature in database
GPG Key ID: 6BE1B64B905567C7
2 changed files with 24 additions and 16 deletions

View File

@ -10,12 +10,13 @@
# 一个简单的循环任务调度器 # 一个简单的循环任务调度器
import time import time
import threading import asyncio
from .variable import running import traceback
from .utils import timestamp_format
from . import log from . import log
logger = log.log("scheduler") logger = log.log("scheduler")
running_event = asyncio.Event()
global tasks global tasks
tasks = [] tasks = []
@ -29,12 +30,14 @@ class taskWrapper:
def check_available(self): def check_available(self):
return (time.time() - self.latest_execute) >= self.interval return (time.time() - self.latest_execute) >= self.interval
def run(self): async def run(self):
try: try:
logger.info(f"task {self.name} run start") logger.info(f"task {self.name} run start")
self.function() await self.function()
logger.info(f'task {self.name} run success, next execute: {timestamp_format(self.interval + self.latest_execute)}')
except Exception as e: except Exception as e:
logger.error(f"task {self.name} run failed, waiting for next execute...") logger.error(f"task {self.name} run failed, waiting for next execute...")
logger.error(traceback.format_exc())
def append(name, task, interval = 86400): def append(name, task, interval = 86400):
global tasks global tasks
@ -42,18 +45,17 @@ def append(name, task, interval = 86400):
wrapper = taskWrapper(name, task, interval) wrapper = taskWrapper(name, task, interval)
return tasks.append(wrapper) return tasks.append(wrapper)
def thread_runner(): # 在 thread_runner 函数中修改循环逻辑
global tasks async def thread_runner():
while True: global tasks, running_event
if not running: while not running_event.is_set():
return
for t in tasks: for t in tasks:
if t.check_available(): if t.check_available() and not running_event.is_set():
t.latest_execute = int(time.time()) t.latest_execute = int(time.time())
threading.Thread(target = t.run).start() await t.run() # 等待异步任务完成
time.sleep(1) await asyncio.sleep(1)
def run(): async def run():
logger.debug("scheduler thread starting...") logger.debug("scheduler thread starting...")
threading.Thread(target = thread_runner).start() task = asyncio.create_task(thread_runner())
logger.debug("schedluer thread load success") logger.debug("schedluer thread load success")

View File

@ -14,6 +14,7 @@ from common import lxsecurity
from common import log from common import log
from common import Httpx from common import Httpx
from common import variable from common import variable
from common import scheduler
from aiohttp.web import Response from aiohttp.web import Response
import ujson as json import ujson as json
import threading import threading
@ -130,6 +131,7 @@ async def run_app():
logger.info(f"监听 -> http://{host}:{port}") logger.info(f"监听 -> http://{host}:{port}")
async def initMain(): async def initMain():
await scheduler.run()
variable.aioSession = aiohttp.ClientSession() variable.aioSession = aiohttp.ClientSession()
try: try:
await run_app() await run_app()
@ -147,7 +149,11 @@ async def initMain():
logger.error("遇到未知错误,请查看日志") logger.error("遇到未知错误,请查看日志")
logger.error(traceback.format_exc()) logger.error(traceback.format_exc())
finally: finally:
logger.info('wating for sessions to complete...')
if variable.aioSession:
await variable.aioSession.close() await variable.aioSession.close()
variable.running = False
logger.info("Server stopped") logger.info("Server stopped")
if __name__ == "__main__": if __name__ == "__main__":