208 lines
11 KiB
Python
208 lines
11 KiB
Python
import os
|
|
import time
|
|
import base64
|
|
import psutil
|
|
import statistics
|
|
import _thread as thread
|
|
|
|
from fastapi import APIRouter, HTTPException, Response
|
|
from urllib.parse import unquote
|
|
from configs.config import IMAGES_PATH
|
|
from models.mysql import pool
|
|
from utilities.download import download_image, generate_thumbnail
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# 预热图片(获取一次图片, 遍历图片表, 检查OSS中所有被预定的尺寸是否存在, 不存在则生成)
|
|
@router.get("/warm", summary="预热图片", description="预热图片")
|
|
def warm_image(op:int=0, end:int=10, version:str='0'):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
cursor.execute(f"SELECT * FROM `web_images` limit {op}, {end}")
|
|
for img in cursor.fetchall():
|
|
# 如果CPU使用率大于50%, 则等待, 直到CPU使用率小于50%
|
|
while statistics.mean(psutil.cpu_percent(interval=1, percpu=True)) > 50:
|
|
print(statistics.mean(psutil.cpu_percent(interval=1, percpu=True)), '等待CPU释放...')
|
|
time.sleep(2)
|
|
# 如果内存剩余小于1G, 则等待, 直到内存剩余大于1G
|
|
while psutil.virtual_memory().available < 1024 * 1024 * 1024:
|
|
print(psutil.virtual_memory().available, '等待内存释放...')
|
|
time.sleep(2)
|
|
# CPU使用率已降低, 开始处理图片
|
|
image = download_image(img['content']) # 从OSS下载原图
|
|
if not image:
|
|
print('跳过不存在的图片:', img['content'])
|
|
continue
|
|
# 创建新线程处理图片
|
|
try:
|
|
print('开始处理图片:', img['content'])
|
|
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 3, 328, 'webp'))
|
|
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 2, 328, 'webp'))
|
|
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 1, 328, 'webp'))
|
|
except:
|
|
print('无法启动线程')
|
|
return Response('预热成功', status_code=200, media_type='text/plain', headers={'Content-Type': 'text/plain; charset=utf-8'})
|
|
|
|
|
|
# 获取非标准类缩略图
|
|
@router.get("/{type}-{id}-{version}@{n}x{w}.{ext}", summary="获取非标准类缩略图", description="/img/article-233-version@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
|
|
def get_image_type_thumbnail(type:str, id:str, version:str, n:int, w:int, ext:str):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
img_path = f"{IMAGES_PATH}/{type}-{id}-{version}@{n}x{w}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
if type == 'ad' or type == 'article' or type == 'article_attribute':
|
|
count = cursor.execute(f"SELECT image FROM `web_{type}` WHERE `id`={id}")
|
|
img = cursor.fetchone()
|
|
if img is None:
|
|
print('图片不存在:', count)
|
|
return Response('图片不存在', status_code=404)
|
|
url = img[0]
|
|
elif type == 'url':
|
|
id = unquote(id, 'utf-8')
|
|
id = id.replace(' ','+')
|
|
url = unquote(base64.b64decode(id))
|
|
print(url)
|
|
elif type == 'avatar':
|
|
count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}")
|
|
user = cursor.fetchone()
|
|
if user is None:
|
|
print('用户不存在:', count)
|
|
return Response('用户不存在', status_code=404)
|
|
url = user[0]
|
|
else:
|
|
print('图片类型不存在:', type)
|
|
return Response('图片类型不存在', status_code=404)
|
|
image = download_image(url)
|
|
if not image:
|
|
return Response('图片不存在', status_code=404)
|
|
# 如果是 avatar, 则裁剪为正方形
|
|
if type == 'avatar':
|
|
px = image.size[0] if image.size[0] < image.size[1] else image.size[1]
|
|
image = image.crop((0, 0, px, px))
|
|
image.thumbnail((n*w, image.size[1]))
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
|
|
|
|
# 获取非标准类原尺寸图
|
|
@router.get("/{type}-{id}-{version}.{ext}", summary="获取文章缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
|
|
def get_image_type(type:str, id:str, version:str, ext:str):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
img_path = f"{IMAGES_PATH}/{type}-{id}-{version}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
if type == 'ad' or type == 'article' or type == 'article_attribute':
|
|
count = cursor.execute(f"SELECT * FROM `web_{type}` WHERE `id`={id}")
|
|
img = cursor.fetchone()
|
|
if img is None:
|
|
print('图片不存在:', count)
|
|
return Response('图片不存在', status_code=404)
|
|
url = img['image']
|
|
elif type == 'url':
|
|
id = unquote(id, 'utf-8')
|
|
id = id.replace(' ','+')
|
|
url = unquote(base64.b64decode(id))
|
|
print("url:", url)
|
|
elif type == 'avatar':
|
|
count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}")
|
|
user = cursor.fetchone()
|
|
if user is None:
|
|
print('用户不存在:', count)
|
|
return Response('用户不存在', status_code=404)
|
|
url = user[0]
|
|
else:
|
|
print('图片类型不存在:', type)
|
|
return Response('图片类型不存在', status_code=404)
|
|
image = download_image(url)
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
|
|
|
|
# 通过url获取图片
|
|
@router.get("/url-{url}@{n}x{w}.{ext}", summary="通过url获取图片", description="/img/article-233.webp")
|
|
def get_image_url(url:str, n:int, w:int, ext:str):
|
|
img_path = f"{IMAGES_PATH}/{type}-{url}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
url = unquote(url, 'utf-8').replace(' ','+')
|
|
url = unquote(base64.b64decode(url))
|
|
image = download_image(url)
|
|
if not image:
|
|
return Response('图片不存在', status_code=404)
|
|
image.thumbnail((n*w, image.size[1]))
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
|
|
|
|
# 获取标准缩略图(带版本号)
|
|
@router.get("/{id}-{version}@{n}x{w}.{ext}", summary="获取缩略图(带版本号)", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
|
|
def get_image_thumbnail(id:int, version:str, n:int, w:int, ext:str):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
# 判断图片是否已经生成
|
|
img_path = f"{IMAGES_PATH}/{id}-{version}@{n}x{w}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
# 从数据库获取原图地址
|
|
cursor.execute(f"SELECT content FROM `web_images` WHERE `id`={id}")
|
|
img = cursor.fetchone()
|
|
if img is None:
|
|
print('图片不存在:', id)
|
|
return Response('图片不存在', status_code=404)
|
|
image = download_image(img[0])
|
|
if not image:
|
|
return Response('图片不存在', status_code=404)
|
|
image.thumbnail((n*w, image.size[1]))
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
|
|
|
|
# 获取标准缩略图
|
|
@router.get("/{id}@{n}x{w}.{ext}", summary="获取缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
|
|
def get_image_thumbnail(id:int, n:int, w:int, ext:str):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
# 判断图片是否已经生成
|
|
img_path = f"{IMAGES_PATH}/{id}@{n}x{w}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
# 从数据库获取原图地址
|
|
cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}")
|
|
img = cursor.fetchone()
|
|
if img is None:
|
|
print('图片不存在:', id)
|
|
return Response('图片不存在', status_code=404)
|
|
image = download_image(img['content'])
|
|
if not image:
|
|
return Response('图片不存在', status_code=404)
|
|
image.thumbnail((n*w, image.size[1]))
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
|
|
|
|
# 获取标准原尺寸图
|
|
@router.get("/{id}.{ext}", summary="获取标准原尺寸图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 无后缀获取原图")
|
|
def get_image(id: int = 824, ext: str = 'webp'):
|
|
with pool.connection() as conn:
|
|
with conn.cursor() as cursor:
|
|
# 判断图片是否已经生成
|
|
img_path = f"{IMAGES_PATH}/{id}.{ext}"
|
|
if os.path.exists(img_path):
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|
|
# 从数据库获取原图地址
|
|
cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}")
|
|
img = cursor.fetchone()
|
|
if img is None:
|
|
print('图片不存在:', id)
|
|
return Response('图片不存在', status_code=404)
|
|
image = download_image(img['content'])
|
|
if not image:
|
|
return Response('图片不存在', status_code=404)
|
|
image.save(img_path, ext, save_all=True)
|
|
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
|