Spaces:
Running
Running
from sse_starlette.sse import EventSourceResponse | |
from starlette.responses import JSONResponse, FileResponse | |
from fastapi import FastAPI, Request | |
import gradio as gr | |
import requests | |
import argparse | |
import aiohttp | |
import uvicorn | |
import random | |
import string | |
import base64 | |
import json | |
import sys | |
import os | |
# --- === CONFIG === --- | |
IMAGE_HANDLE = "url"# or "base64" | |
API_BASE = "openai"# or "env" | |
api_key = os.environ['OPENAI_API_KEY'] | |
base_url = os.environ.get('OPENAI_BASE_URL', "https://api.openai.com/v1") | |
# --- === CONFIG === --- | |
if API_BASE == "env": | |
try: | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"}) | |
response.raise_for_status() | |
models = response.json() | |
if not ('data' in models): | |
base_url = "https://api.openai.com/v1" | |
except Exception as e: | |
print(f"Error testing API endpoint: {e}") | |
else: | |
base_url = "https://api.openai.com/v1" | |
async def streamChat(params): | |
async with aiohttp.ClientSession() as session: | |
async with session.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json=params) as r: | |
r.raise_for_status() | |
async for line in r.content: | |
if line: | |
line_str = line.decode('utf-8') | |
if line_str.startswith("data: "): | |
line_str = line_str[6:].strip() | |
if line_str == "[DONE]": | |
continue | |
try: | |
message = json.loads(line_str) | |
yield message | |
except json.JSONDecodeError: | |
continue | |
def rnd(length=8): | |
letters = string.ascii_letters + string.digits | |
return ''.join(random.choice(letters) for i in range(length)) | |
def getModels(): | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}",}) | |
response.raise_for_status() | |
models = response.json() | |
return sorted([ | |
model['id'] for model in models['data'] | |
if 'gpt' in model['id'] and model['id'] not in {"gpt-3.5-turbo-instruct", "gpt-3.5-turbo-instruct-0914"} | |
]) | |
def handleMultimodalData(model, role, data): | |
if type(data) == str: | |
return {"role": role, "content": str(data)} | |
elif isinstance(data, str): | |
return {"role": role, "content": data.text} | |
elif hasattr(data, 'files') and data.files and len(data.files) > 0 and model in {"gpt-4-1106-vision-preview", "gpt-4-vision-preview", "gpt-4-turbo", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"}: | |
result, handler, hasFoundFile = [], ["[System: This message contains files; the system will be splitting it.]"], False | |
for file in data.files: | |
if file.mime_type.startswith("image/"): | |
if IMAGE_HANDLE == "base64": | |
with open(file.path, "rb") as image_file: | |
result.append({"type": "image_url", "image_url": {"url": "data:" + file.mime_type + ";base64," + base64.b64encode(image_file.read()).decode('utf-8')}}) | |
image_file.close() | |
else: | |
result.append({"type": "image_url", "image_url": {"url": file.url}}) | |
if file.mime_type.startswith("text/") or file.mime_type.startswith("application/"): | |
hasFoundFile = True | |
with open(file.path, "rb") as data_file: | |
handler.append("<|file_start|>" + file.orig_name + "\n" + data_file.read().decode('utf-8') + "<|file_end|>") | |
if hasFoundFile: | |
handler.append(data.text) | |
return {"role": role, "content": [{"type": "text", "text": "\n\n".join(handler)}] + result} | |
else: | |
return {"role": role, "content": [{"type": "text", "text": data.text}] + result} | |
elif hasattr(data, 'files') and data.files and len(data.files) > 0 and not (model in {"gpt-4-1106-vision-preview", "gpt-4-vision-preview", "gpt-4-turbo", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"}): | |
handler, hasFoundFile = ["[System: This message contains files; the system will be splitting it.]"], False | |
for file in data.files: | |
if file.mime_type.startswith("text/") or file.mime_type.startswith("application/"): | |
hasFoundFile = True | |
with open(file.path, "rb") as data_file: | |
handler.append("<|file_start|>" + file.orig_name + "\n" + data_file.read().decode('utf-8') + "<|file_end|>") | |
if hasFoundFile: | |
handler.append(data.text) | |
return {"role": role, "content": "\n\n".join(handler)} | |
else: | |
return {"role": role, "content": data.text} | |
else: | |
if isinstance(data, tuple): | |
return {"role": role, "content": str(data)} | |
return {"role": role, "content": getattr(data, 'text', str(data))} | |
async def respond( | |
message, | |
history: list[tuple[str, str]], | |
system_message, | |
model_name, | |
max_tokens, | |
temperature, | |
top_p, | |
seed, | |
random_seed | |
): | |
messages = [{"role": "system", "content": "If user submits any file that file will be visible only that turn. This is not due to privacy related things but rather due to developer's lazyness; Ask user to upload the file again if they ask a follow-up question without the data."}, {"role": "system", "content": system_message}] | |
for val in history: | |
if val[0]: | |
messages.append(handleMultimodalData(model_name,"user",val[0])) | |
if val[1]: | |
messages.append(handleMultimodalData(model_name,"assistant",val[1])) | |
messages.append(handleMultimodalData(model_name,"user",message)) | |
response = "" | |
completion = streamChat({ | |
"model": model_name, | |
"messages": messages, | |
"max_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"seed": (random.randint(0, 2**32) if random_seed else seed), | |
"user": rnd(), | |
"stream": True | |
}) | |
async for token in completion: | |
response += token['choices'][0]['delta'].get("content", "") | |
yield response | |
demo = gr.ChatInterface( | |
respond, | |
title="GPT-4O-mini", | |
description="A simple proxy to OpenAI!<br/>You can use this space as a proxy! click [here](/api/v1/docs) to view the documents.<br/>Also you can only submit images to vision/4o models but can submit txt/code/etc. files to all models.<br/>###### Also the file queries are only shown to model for 1 round cuz gradio.", | |
multimodal=True, | |
additional_inputs=[ | |
gr.Textbox(value="You are a helpful assistant.", label="System message"), | |
gr.Dropdown(choices=getModels(), value="gpt-4o-mini-2024-07-18", label="Model"), | |
gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"), | |
gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"), | |
gr.Slider( | |
minimum=0.05, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-p (nucleus sampling)", | |
), | |
gr.Slider(minimum=0, maximum=2**32, value=0, step=1, label="Seed"), | |
gr.Checkbox(label="Randomize Seed", value=True), | |
], | |
) | |
app = FastAPI() | |
def html(): | |
return FileResponse("index.html") | |
async def test_endpoint(): | |
response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"}) | |
response.raise_for_status() | |
models = response.json() | |
models['data'] = sorted( | |
[model for model in models['data'] if 'gpt' in model['id'] and model['id'] not in {"gpt-3.5-turbo-instruct", "gpt-3.5-turbo-instruct-0914"}], | |
key=lambda x: x['id'] | |
) | |
return JSONResponse(content=models) | |
async def chat_completion(request: Request): | |
try: | |
body = await request.json() | |
if not body.get("messages") or not body.get("model"): | |
return JSONResponse(content={"error": { "code": "MISSING_VALUE", "message": "Both 'messages' and 'model' are required fields."}}, status_code=400) | |
params = { | |
key: value for key, value in { | |
"model": body.get("model"), | |
"messages": body.get("messages"), | |
"max_tokens": body.get("max_tokens"), | |
"temperature": body.get("temperature"), | |
"top_p": body.get("top_p"), | |
"frequency_penalty": body.get("frequency_penalty"), | |
"logit_bias": body.get("logit_bias"), | |
"logprobs": body.get("logprobs"), | |
"top_logprobs": body.get("top_logprobs"), | |
"n": body.get("n"), | |
"presence_penalty": body.get("presence_penalty"), | |
"response_format": body.get("response_format"), | |
"seed": body.get("seed"), | |
"service_tier": body.get("service_tier"), | |
"stop": body.get("stop"), | |
"stream": body.get("stream"), | |
"stream_options": body.get("stream_options"), | |
"tools": body.get("tools"), | |
"tool_choice": body.get("tool_choice"), | |
"parallel_tool_calls": body.get("parallel_tool_calls"), | |
"user": rnd(), | |
}.items() if value is not None | |
} | |
if body.get("stream"): | |
async def event_generator(): | |
async for event in streamChat(params): | |
yield json.dumps(event) | |
return EventSourceResponse(event_generator()) | |
else: | |
response = requests.post(f"{base_url}/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json=params) | |
response.raise_for_status() | |
completion = response.json() | |
return JSONResponse(content=completion) | |
except Exception as e: | |
return JSONResponse(content={"error": { "code": "SERVER_ERROR", "message": str(e)}}, status_code=400) | |
app = gr.mount_gradio_app(app, demo, path="/") | |
class ArgParser(argparse.ArgumentParser): | |
def __init__(self, *args, **kwargs): | |
super(ArgParser, self).__init__(*args, **kwargs) | |
self.add_argument("-s", "--server", type=str, default="0.0.0.0") | |
self.add_argument("-p", "--port", type=int, default=7860) | |
self.add_argument("-d", "--dev", default=False, action="store_true") | |
self.args = self.parse_args(sys.argv[1:]) | |
if __name__ == "__main__": | |
args = ArgParser().args | |
if args.dev: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True) | |
else: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False) |