import os import time import base64 import psutil import statistics import _thread as thread from fastapi import APIRouter, HTTPException, Response from urllib.parse import unquote from configs.config import IMAGES_PATH from models.mysql import pool from utilities.download import download_image, generate_thumbnail router = APIRouter() # 预热图片(获取一次图片, 遍历图片表, 检查OSS中所有被预定的尺寸是否存在, 不存在则生成) @router.get("/warm", summary="预热图片", description="预热图片") def warm_image(op:int=0, end:int=10, version:str='0'): with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(f"SELECT * FROM `web_images` limit {op}, {end}") for img in cursor.fetchall(): # 如果CPU使用率大于50%, 则等待, 直到CPU使用率小于50% while statistics.mean(psutil.cpu_percent(interval=1, percpu=True)) > 50: print(statistics.mean(psutil.cpu_percent(interval=1, percpu=True)), '等待CPU释放...') time.sleep(2) # 如果内存剩余小于1G, 则等待, 直到内存剩余大于1G while psutil.virtual_memory().available < 1024 * 1024 * 1024: print(psutil.virtual_memory().available, '等待内存释放...') time.sleep(2) # CPU使用率已降低, 开始处理图片 image = download_image(img['content']) # 从OSS下载原图 if not image: print('跳过不存在的图片:', img['content']) continue # 创建新线程处理图片 try: print('开始处理图片:', img['content']) thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 3, 328, 'webp')) thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 2, 328, 'webp')) thread.start_new_thread(generate_thumbnail, (image, img['id'], version, 1, 328, 'webp')) except: print('无法启动线程') return Response('预热成功', status_code=200, media_type='text/plain', headers={'Content-Type': 'text/plain; charset=utf-8'}) # 获取非标准类缩略图 @router.get("/{type}-{id}-{version}@{n}x{w}.{ext}", summary="获取非标准类缩略图", description="/img/article-233-version@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图") def get_image_type_thumbnail(type:str, id:str, version:str, n:int, w:int, ext:str): with pool.connection() as conn: with conn.cursor() as cursor: img_path = f"{IMAGES_PATH}/{type}-{id}-{version}@{n}x{w}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") if type == 'ad' or type == 'article' or type == 'article_attribute': count = cursor.execute(f"SELECT * FROM `web_{type}` WHERE `id`={id}") img = cursor.fetchone() if img is None: print('图片不存在:', count) return Response('图片不存在', status_code=404) url = img['image'] elif type == 'url': id = unquote(id, 'utf-8') id = id.replace(' ','+') url = unquote(base64.b64decode(id)) print(url) elif type == 'avatar': count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}") user = cursor.fetchone() if user is None: print('用户不存在:', count) return Response('用户不存在', status_code=404) url = user['avatar'] else: print('图片类型不存在:', type) return Response('图片类型不存在', status_code=404) image = download_image(url) if not image: return Response('图片不存在', status_code=404) # 如果是 avatar, 则裁剪为正方形 if type == 'avatar': px = image.size[0] if image.size[0] < image.size[1] else image.size[1] image = image.crop((0, 0, px, px)) image.thumbnail((n*w, image.size[1])) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 获取非标准类原尺寸图 @router.get("/{type}-{id}-{version}.{ext}", summary="获取文章缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图") def get_image_type(type:str, id:str, version:str, ext:str): with pool.connection() as conn: with conn.cursor() as cursor: img_path = f"{IMAGES_PATH}/{type}-{id}-{version}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") if type == 'ad' or type == 'article' or type == 'article_attribute': count = cursor.execute(f"SELECT * FROM `web_{type}` WHERE `id`={id}") img = cursor.fetchone() if img is None: print('图片不存在:', count) return Response('图片不存在', status_code=404) url = img['image'] elif type == 'url': id = unquote(id, 'utf-8') id = id.replace(' ','+') url = unquote(base64.b64decode(id)) print("url:", url) elif type == 'avatar': count = cursor.execute(f"SELECT avatar FROM `web_member` WHERE `id`={id}") user = cursor.fetchone() if user is None: print('用户不存在:', count) return Response('用户不存在', status_code=404) url = user['avatar'] else: print('图片类型不存在:', type) return Response('图片类型不存在', status_code=404) image = download_image(url) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 通过url获取图片 @router.get("/url-{url}@{n}x{w}.{ext}", summary="通过url获取图片", description="/img/article-233.webp") def get_image_url(url:str, n:int, w:int, ext:str): img_path = f"{IMAGES_PATH}/{type}-{url}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") url = unquote(url, 'utf-8').replace(' ','+') url = unquote(base64.b64decode(url)) image = download_image(url) if not image: return Response('图片不存在', status_code=404) image.thumbnail((n*w, image.size[1])) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 获取标准缩略图(带版本号) @router.get("/{id}-{version}@{n}x{w}.{ext}", summary="获取缩略图(带版本号)", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图") def get_image_thumbnail(id:int, version:str, n:int, w:int, ext:str): with pool.connection() as conn: with conn.cursor() as cursor: # 判断图片是否已经生成 img_path = f"{IMAGES_PATH}/{id}-{version}@{n}x{w}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 从数据库获取原图地址 cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}") img = cursor.fetchone() if img is None: print('图片不存在:', id) return Response('图片不存在', status_code=404) image = download_image(img['content']) if not image: return Response('图片不存在', status_code=404) image.thumbnail((n*w, image.size[1])) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 获取标准缩略图 @router.get("/{id}@{n}x{w}.{ext}", summary="获取缩略图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 通过@1x320后缀获取1x缩略图, 通过@2x320后缀获取2x缩略图, 通过@3x320后缀获取3x缩略图") def get_image_thumbnail(id:int, n:int, w:int, ext:str): with pool.connection() as conn: with conn.cursor() as cursor: # 判断图片是否已经生成 img_path = f"{IMAGES_PATH}/{id}@{n}x{w}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 从数据库获取原图地址 cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}") img = cursor.fetchone() if img is None: print('图片不存在:', id) return Response('图片不存在', status_code=404) image = download_image(img['content']) if not image: return Response('图片不存在', status_code=404) image.thumbnail((n*w, image.size[1])) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 获取标准原尺寸图 @router.get("/{id}.{ext}", summary="获取标准原尺寸图", description="/img/233@1x320.webp 通过webp后缀获取webp格式图片, 无后缀获取原图") def get_image(id: int = 824, ext: str = 'webp'): with pool.connection() as conn: with conn.cursor() as cursor: # 判断图片是否已经生成 img_path = f"{IMAGES_PATH}/{id}.{ext}" if os.path.exists(img_path): return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}") # 从数据库获取原图地址 cursor.execute(f"SELECT * FROM `web_images` WHERE `id`={id}") img = cursor.fetchone() if img is None: print('图片不存在:', id) return Response('图片不存在', status_code=404) image = download_image(img['content']) if not image: return Response('图片不存在', status_code=404) image.save(img_path, ext, save_all=True) return Response(content=open(img_path, 'rb').read(), media_type=f"image/{ext}")