Files
reverse_image_search_gpu/routers/img.py
2024-11-11 17:55:52 +08:00

208 lines
11 KiB
Python

import os
import time
import base64
import psutil
import statistics
import _thread as thread
from fastapi import APIRouter, HTTPException, Response
from urllib.parse import unquote
from configs.config import IMAGES_PATH
from models.mysql import pool
from utilities.download import download_image, generate_thumbnail
router = APIRouter()
# 预热图片(获取一次图片, 遍历图片表, 检查OSS中所有被预定的尺寸是否存在, 不存在则生成)
@router.get("/warm", summary="预热图片", description="预热图片")
def warm_image(op:int=0, end:int=10, version:str='0'):
with pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM `web_images` limit {op}, {end}")
for img in cursor.fetchall():
# 如果CPU使用率大于50%, 则等待, 直到CPU使用率小于50%
while statistics.mean(psutil.cpu_percent(interval=1, percpu=True)) > 50:
print(statistics.mean(psutil.cpu_percent(interval=1, percpu=True)), '等待CPU释放...')
time.sleep(2)
# 如果内存剩余小于1G, 则等待, 直到内存剩余大于1G
while psutil.virtual_memory().available < 1024 * 1024 * 1024:
print(psutil.virtual_memory().available, '等待内存释放...')
time.sleep(2)
# CPU使用率已降低, 开始处理图片
image = download_image(img['content']) # 从OSS下载原图
if not image:
print('跳过不存在的图片:', img['content'])
continue
# 创建新线程处理图片
try:
print('开始处理图片:', img['content'])
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 3, 328, 'webp'))
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 2, 328, 'webp'))
thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 1, 328, 'webp'))
except:
print('无法启动线程')
return Response('预热成功', status_code=200, media_type='text/plain', headers={'Content-Type': 'text/plain; charset=utf-8'})
# 获取非标准类缩略图
@router.get("/{type}-{id}-{version}@{n}x{w}.{ext}", summary="获取非标准类缩略图", description="/img/article-233-version@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
def get_image_type_thumbnail(type:str, id:str, version:str, n:int, w:int, ext:str):
with pool.connection() as conn:
with conn.cursor() as cursor:
img_path = f"{IMAGES_PATH}/{type}-{id}-{version}@{n}x{w}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
if type == 'ad' or type == 'article' or type == 'article_attribute':
count = cursor.execute(f"SELECT image FROM `web_{type}` WHERE `id`={id}")
img = cursor.fetchone()
if img is None:
print('图片不存在:', count)
return Response('图片不存在', status_code=404)
url = img[0]
elif type == 'url':
id = unquote(id, 'utf-8')
id = id.replace(' ','+')
url = unquote(base64.b64decode(id))
print(url)
elif type == 'avatar':
count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}")
user = cursor.fetchone()
if user is None:
print('用户不存在:', count)
return Response('用户不存在', status_code=404)
url = user[0]
else:
print('图片类型不存在:', type)
return Response('图片类型不存在', status_code=404)
image = download_image(url)
if not image:
return Response('图片不存在', status_code=404)
# 如果是 avatar, 则裁剪为正方形
if type == 'avatar':
px = image.size[0] if image.size[0] < image.size[1] else image.size[1]
image = image.crop((0, 0, px, px))
image.thumbnail((n*w, image.size[1]))
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 获取非标准类原尺寸图
@router.get("/{type}-{id}-{version}.{ext}", summary="获取文章缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
def get_image_type(type:str, id:str, version:str, ext:str):
with pool.connection() as conn:
with conn.cursor() as cursor:
img_path = f"{IMAGES_PATH}/{type}-{id}-{version}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
if type == 'ad' or type == 'article' or type == 'article_attribute':
count = cursor.execute(f"SELECT image FROM `web_{type}` WHERE `id`={id}")
img = cursor.fetchone()
if img is None:
print('图片不存在:', count)
return Response('图片不存在', status_code=404)
url = img[0]
elif type == 'url':
id = unquote(id, 'utf-8')
id = id.replace(' ','+')
url = unquote(base64.b64decode(id))
print("url:", url)
elif type == 'avatar':
count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}")
user = cursor.fetchone()
if user is None:
print('用户不存在:', count)
return Response('用户不存在', status_code=404)
url = user[0]
else:
print('图片类型不存在:', type)
return Response('图片类型不存在', status_code=404)
image = download_image(url)
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 通过url获取图片
@router.get("/url-{url}@{n}x{w}.{ext}", summary="通过url获取图片", description="/img/article-233.webp")
def get_image_url(url:str, n:int, w:int, ext:str):
img_path = f"{IMAGES_PATH}/{type}-{url}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
url = unquote(url, 'utf-8').replace(' ','+')
url = unquote(base64.b64decode(url))
image = download_image(url)
if not image:
return Response('图片不存在', status_code=404)
image.thumbnail((n*w, image.size[1]))
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 获取标准缩略图(带版本号)
@router.get("/{id}-{version}@{n}x{w}.{ext}", summary="获取缩略图(带版本号)", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
def get_image_thumbnail(id:int, version:str, n:int, w:int, ext:str):
with pool.connection() as conn:
with conn.cursor() as cursor:
# 判断图片是否已经生成
img_path = f"{IMAGES_PATH}/{id}-{version}@{n}x{w}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 从数据库获取原图地址
cursor.execute(f"SELECT content FROM `web_images` WHERE `id`={id}")
img = cursor.fetchone()
if img is None:
print('图片不存在:', id)
return Response('图片不存在', status_code=404)
image = download_image(img[0])
if not image:
return Response('图片不存在', status_code=404)
image.thumbnail((n*w, image.size[1]))
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 获取标准缩略图
@router.get("/{id}@{n}x{w}.{ext}", summary="获取缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图")
def get_image_thumbnail(id:int, n:int, w:int, ext:str):
with pool.connection() as conn:
with conn.cursor() as cursor:
# 判断图片是否已经生成
img_path = f"{IMAGES_PATH}/{id}@{n}x{w}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 从数据库获取原图地址
cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}")
img = cursor.fetchone()
if img is None:
print('图片不存在:', id)
return Response('图片不存在', status_code=404)
image = download_image(img['content'])
if not image:
return Response('图片不存在', status_code=404)
image.thumbnail((n*w, image.size[1]))
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 获取标准原尺寸图
@router.get("/{id}.{ext}", summary="获取标准原尺寸图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 无后缀获取原图")
def get_image(id: int = 824, ext: str = 'webp'):
with pool.connection() as conn:
with conn.cursor() as cursor:
# 判断图片是否已经生成
img_path = f"{IMAGES_PATH}/{id}.{ext}"
if os.path.exists(img_path):
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")
# 从数据库获取原图地址
cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}")
img = cursor.fetchone()
if img is None:
print('图片不存在:', id)
return Response('图片不存在', status_code=404)
image = download_image(img['content'])
if not image:
return Response('图片不存在', status_code=404)
image.save(img_path, ext, save_all=True)
return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")