from typing import Optional from fastapi import APIRouter, HTTPException, Header from models.mysql import pool router = APIRouter() # 获取当前用户收藏记录 @router.get('', summary='自己的收藏记录', description='获取自己的收藏记录, 用于判断是否收藏(headers中必须附带token)') def get_self_collect(token: Optional[str] = Header()): with pool.connection() as conn: with conn.cursor() as cursor: # 查询用户ID cursor.execute(f"SELECT user_id FROM web_auth WHERE token={token} limit 1") data = cursor.fetchone() print('auth: ', data) if not data: raise HTTPException(status_code=401, detail="用户未登录") user_id = data['user_id'] # 查询收藏记录 cursor.execute(f"SELECT content FROM web_collect WHERE user_id={user_id} AND type='1'") data = cursor.fetchall() # 获取所有记录列表 data = [str(item['content']) for item in data] # 转换为数组 # 查询图片ID(对特殊字符安全转义) cursor.execute(f"SELECT id FROM web_images WHERE content IN ({','.join(['%s'] * len(data))})", data) data = cursor.fetchall() data = [str(item['id']) for item in data] return {'code': 0, 'user_id': user_id, 'total': len(data), 'data': data } # 获取指定用户收藏记录 @router.get('/{user_id}', summary='指定用的户收藏记录', description='获取指定用户收藏记录(仅测试用)') def get_user_collect(user_id: int): with pool.connection() as conn: with conn.cursor() as cursor: cursor.execute(f"SELECT content FROM web_collect WHERE user_id={user_id} AND type=1") data = cursor.fetchall() # 获取所有记录列表 data = [str(item['content']) for item in data] # 转换为数组 if not data: return {'code': 0, 'user_id': user_id, 'total': 0, 'data': []} # 查询图片ID(对特殊字符安全转义) cursor.execute(f"SELECT id FROM web_images WHERE content IN ({','.join(['%s'] * len(data))})", data) data = cursor.fetchall() data = [str(item['id']) for item in data] return {'code': 0, 'user_id': user_id, 'total': len(data), 'data': data}