from starlette.responses import JSONResponse, FileResponse, HTMLResponse from gradio.data_classes import FileData, GradioModel from sse_starlette.sse import EventSourceResponse from typing import (List, Tuple, Optional) from fastapi import FastAPI, Request import gradio as gr import mimetypes import threading import requests import argparse import aiohttp import uvicorn import random import string import base64 import json import time import math import sys import os # --- === CONFIG === --- ENV_HANDLE = "env"#or "url on env" IMAGE_HANDLE = "url"# or "base64" API_BASE = "env"# or "openai" GRADIO_URL_PATH = "" api_key = os.environ['API_API_KEY'] oai_api_key = os.environ['OPENAI_API_KEY'] base_url = os.environ.get('OPENAI_BASE_URL', "") # Will not add O1-mini, and O1-preview into the default, as it requeires TIER-5 sub on OpenAI's API. # But if you wanna add O1 just remove this comment line and comment the other # def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18", "o1-mini", "o1-mini-2024-09-12", "o1-preview", "o1-preview-2024-09-12"]' def_models = '["gpt-4", "gpt-4-0125-preview", "gpt-4-0613", "gpt-4-1106-preview", "gpt-4-turbo", "gpt-4-turbo-2024-04-09", "gpt-4-turbo-preview", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"]' vision_models = {"gpt-4-turbo", "chatgpt-4o-latest", "gpt-4o", "gpt-4o-2024-05-13", "gpt-4o-2024-08-06", "gpt-4o-mini", "gpt-4o-mini-2024-07-18"} calcPrompt = """[System: You have ability to calculate math problems (formated on python), via tools provided to you by system. To call a tool you need to write a json in a empty line; like writing it at the end of message. To use calculator; you need to follow this example JSON: {"tool": "calc", "isCall": true, "prompt": "math.pi * 5"} > 'tool' variable is used to define which tool you are calling > 'isCall' used to confirm that you are calling that function and not showing it for example > 'prompt' the math that will be done via python. Here's few more example so you can under stand better To show as an example> {"tool": "calc", "isCall": false, "prompt": "math.sqrt(16)"} {"tool": "calc", "isCall": false, "prompt": "math.pow(2, 3)"} {"tool": "calc", "isCall": false, "prompt": "math.sin(math.pi / 2)"} To actually use the tool> {"tool": "calc", "isCall": true, "prompt": "math.factorial(5)"} {"tool": "calc", "isCall": true, "prompt": "math.log(100, 10)"} {"tool": "calc", "isCall": true, "prompt": "math.cos(0)"} In chat use examples: 1. Please, wait while I calculate 2+2... {"tool": "calc", "isCall": false, "prompt": "2+2"} 2. Plase, wait while I calculate the square root of 25... {"tool": "calc", "isCall": true, "prompt": "math.sqrt(25)"} 3. This is how I perform calculations: {"tool": "calc", "isCall": false, "prompt": "math.pow(3, 2)"} 4. (Do not do this, this would block the user from seeing the result.) Alright! Here's the result of a complex calculation involving trigonometry and logarithms. ``` {"tool": "calc", "isCall": true, "prompt": "math.sin(math.pi / 4) + math.log(10, 10)"} ]"""; searchPrompt = """[System: You have ability to search queries on a search engine, via tools provided to you by system. (Warning: Each search call can take up to 30 or more seconds. Only one search function can be called per round. If a response has already been received, the system will answer based on that response. If the query needs to be searched again, the system will ask the user if they want to requery.) To call a tool you need to write a json in a empty line; like writing it at the end of message. To look up queries; you need to follow this example JSON: {"tool": "search", "isCall": true, "prompt": "What is the latest news on climate change?"} > 'tool' variable is used to define which tool you are calling > 'isCall' used to confirm that you are calling that function and not showing it for example > 'prompt' the query that will be searched. Here's a few more examples so you can understand better To show as an example> {"tool": "search", "isCall": false, "prompt": "How to bake a chocolate cake?"} {"tool": "search", "isCall": false, "prompt": "What are the symptoms of the flu?"} {"tool": "search", "isCall": false, "prompt": "Best practices for remote work"} To actually use the tool> {"tool": "search", "isCall": true, "prompt": "How to invest in stocks?"} {"tool": "search", "isCall": true, "prompt": "What is the current status of the Mars rover?"} {"tool": "search", "isCall": true, "prompt": "Latest advancements in AI technology"} In chat use examples: 1. Please, wait while I search for the latest trends in technology... {"tool": "search", "isCall": false, "prompt": "Latest trends in technology"} 2. Please, wait while I search for the best ways to improve mental health... {"tool": "search", "isCall": true, "prompt": "Best ways to improve mental health"} 3. This is how I perform searches: {"tool": "search", "isCall": false, "prompt": "How to start a garden?"} 4. (Do not do this, this would block the user from seeing the result.) Alright! Here's the result of a search on the impact of social media on teenagers. ``` {"tool": "search", "isCall": true, "prompt": "Impact of social media on teenagers"} ]"""; # --- === CONFIG === --- def loadENV(): def worker(): while True: if ENV_HANDLE == "url on env": try: response = requests.get(os.environ["ENV_URL"]) response.raise_for_status() env_data = response.json() for key, value in env_data.items(): os.environ[key] = value handleApiKeys() checkModels() loadModels() except Exception as e: print(f"Error loading environment variables: {e}") time.sleep(180) if ENV_HANDLE == "url on env": try: response = requests.get(os.environ["ENV_URL"]) response.raise_for_status() env_data = response.json() for key, value in env_data.items(): os.environ[key] = value handleApiKeys() checkModels() loadModels() except Exception as e: print(f"Error loading environment variables: {e}") threading.Thread(target=worker, daemon=True).start() def checkModels(): global base_url if API_BASE == "env": try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {get_api_key()}"}) response.raise_for_status() if not ('data' in response.json()): base_url = "" api_key = oai_api_key except Exception as e: print(f"Error testing API endpoint: {e}") else: base_url = "" api_key = oai_api_key def loadModels(): global models, modelList try: models = json.loads(os.environ.get('OPENAI_API_MODELS', def_models)) except json.JSONDecodeError: models = json.loads(def_models) models = sorted(models) modelList = { "object": "list", "data": [{"id": v, "object": "model", "created": 0, "owned_by": "system"} for v in models] } def handleApiKeys(): global api_key if ',' in api_key: output = [] for key in api_key.split(','): try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {key}"}) response.raise_for_status() if ('data' in response.json()): output.append(key) except Exception as e: print((F"API key {key} is not valid or an actuall error happend {e}")) if len(output)==1: raise RuntimeError("No API key is working") api_key = ",".join(output) else: try: response = requests.get(f"{base_url}/models", headers={"Authorization": f"Bearer {api_key}"}) response.raise_for_status() if not ('data' in response.json()): raise RuntimeError("Current API key is not valid") except Exception as e: raise RuntimeError(f"Current API key is not valid or an actual error happened: {e}") def safe_eval(expression): print(expression) allowed_names = {name: obj for name, obj in math.__dict__.items() if not name.startswith("__")} allowed_names['math'] = math code = compile(expression, "", "eval") for name in code.co_names: if name not in allowed_names and name != 'math': raise NameError(f"Use of {name} is not allowed") return eval(code, {"__builtins__": {}}, allowed_names) def get_api_key(call='api_key'): if call == 'api_key': key = api_key elif call == 'oai_api_key': key = oai_api_key else: key = api_key if ',' in key: return random.choice(key.split(',')) return key def encodeChat(messages): output = [] for message in messages: role = message['role'] name = f" [{message['name']}]" if 'name' in message else '' content = message['content'] formatted_message = f"<|im_start|>{role}{name}\n{content}<|end_of_text|>" output.append(formatted_message) return "\n".join(output) def moderate(messages): try: response = f"{base_url}/moderations", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key(call='api_key')}" }, json={"input": encodeChat(messages)} ) response.raise_for_status() moderation_result = response.json() except requests.exceptions.RequestException as e: print(f"Error during moderation request to {base_url}: {e}") try: response = "", headers={ "Content-Type": "application/json", "Authorization": f"Bearer {get_api_key(call='oai_api_key')}" }, json={"input": encodeChat(messages)} ) response.raise_for_status() moderation_result = response.json() except requests.exceptions.RequestException as e: print(f"Error during moderation request to fallback URL: {e}") return False try: if any(result["flagged"] for result in moderation_result["results"]): return moderation_result except KeyError: if moderation_result["flagged"]: return moderation_result return False async def streamChat(params): if params.get("model") in ["o1-mini", "o1-mini-2024-09-12", "o1-preview", "o1-preview-2024-09-12"]: if "temperature" in params: del params["temperature"] if "top_p" in params: del params["top_p"] if "max_tokens" in params: params["max_completion_tokens"] = params.pop("max_tokens") for message in params.get("messages", []): if message["role"] == "system": params["messages"].remove(message) params["stream"] = False; async with aiohttp.ClientSession() as session: try: async with"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() response_data = await r.json() yield {"choices": [{"delta": {"content": response_data["choices"][0]["message"]["content"]}}]} except aiohttp.ClientError: try: async with"", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() response_data = await r.json() yield {"choices": [{"delta": {"content": response_data["choices"][0]["message"]["content"]}}]} except aiohttp.ClientError: return else: async with aiohttp.ClientSession() as session: try: async with"{base_url}/chat/completions", headers={"Authorization": f"Bearer {get_api_key(call='api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() async for line in r.content: if line: line_str = line.decode('utf-8') if line_str.startswith("data: "): line_str = line_str[6:].strip() if line_str == "[DONE]": continue try: message = json.loads(line_str) yield message except json.JSONDecodeError: continue except aiohttp.ClientError: try: async with"", headers={"Authorization": f"Bearer {get_api_key(call='oai_api_key')}", "Content-Type": "application/json"}, json=params) as r: r.raise_for_status() async for line in r.content: if line: line_str = line.decode('utf-8') if line_str.startswith("data: "): line_str = line_str[6:].strip() if line_str == "[DONE]": continue try: message = json.loads(line_str) yield message except json.JSONDecodeError: continue except aiohttp.ClientError: return def searchEngine(query): ### This /search endpoint is custom made, OpenAI does not have it. ### If you dupelicate this space, please either try to find another API or make one yourself. response = requests.get(f"{base_url}/search?query={requests.utils.quote(query)}") response.raise_for_status() response_data = response.json() return response_data.get("choices", [{}])[0].get("message", {}).get("content", "") def rnd(length=8): letters = string.ascii_letters + string.digits return ''.join(random.choice(letters) for i in range(length)) def handleMultimodalData(model, role, data): if isinstance(data, dict) and 'files' in data and len(data['files']) > 0: result, handler, hasFoundFile = [], ["[System: This message contains files; the system will be splitting it.]"], False for file in data['files']: if file['mime_type'].startswith("image/") and model in vision_models: if IMAGE_HANDLE == "base64": with open(file['path'], "rb") as image_file: encoded_image = base64.b64encode('utf-8') result.append({"type": "image_url", "image_url": {"url": "data:" + file['mime_type'] + ";base64," + encoded_image}}) else: result.append({"type": "image_url", "image_url": {"url": GRADIO_URL_PATH + "file=" + requests.utils.quote(str(file['path']))}}) if file['mime_type'].startswith("text/") or file['mime_type'].startswith("application/"): hasFoundFile = True try: with open(file['path'], "rb") as data_file: file_content ='utf-8') handler.append("<|file_start|>" + file['orig_name'] + "\n" + file_content + "<|file_end|>") except UnicodeDecodeError: continue if hasFoundFile: handler.append(data.get('text', '')) return {"role": role, "content": [{"type": "text", "text": "\n\n".join(handler)}] + result} else: return {"role": role, "content": [{"type": "text", "text": data.get('text', '')}] + result} elif isinstance(data, Tuple): file_path = data[0] mime_type, _ = mimetypes.guess_type(file_path) content = getattr(data, 'text', str(data)) if mime_type and mime_type.startswith("image/") and model in vision_models: if IMAGE_HANDLE == "base64": with open(file_path, "rb") as image_file: encoded_image = base64.b64encode('utf-8') return {"role": role, "content": [{"type": "image_url", "image_url": {"url": "data:" + mime_type + ";base64," + encoded_image}}]} else: return {"role": role, "content": [{"type": "image_url", "image_url": {"url": GRADIO_URL_PATH + "file=" + requests.utils.quote(file_path)}}]} elif mime_type and (mime_type.startswith("text/") or mime_type.startswith("application/")): try: with open(file_path, "rb") as data_file: file_content ='utf-8') return {"role": role, "content": [{"type": "text", "text": "<|file_start|>" + os.path.basename(file_path) + "\n" + file_content + "<|file_end|>"}]} except UnicodeDecodeError: return {"role": role, "content": [{"type": "text", "text": content}]} else: return {"role": role, "content": content} else: return {"role": role, "content": getattr(data, 'text', str(data))} class FileMessage(GradioModel): file: FileData alt_text: Optional[str] = None class MultimodalMessage(GradioModel): text: Optional[str] = None files: Optional[List[FileMessage]] async def respond( message, history: List[Tuple[ Optional[MultimodalMessage], Optional[MultimodalMessage], ]], system_message, model_name, max_tokens, temperature, top_p, seed, random_seed, calcBeta, searchBeta, betterSystemPrompt ): messages = []; if calcBeta: messages.append({"role": "system", "content": calcPrompt}); if searchBeta: messages.append({"role": "system", "content": searchPrompt}); if betterSystemPrompt: messages.append({"role": "system", "content": f"You are a helpful assistant. You are an OpenAI GPT model named {model_name}. The current time is {time.strftime('%Y-%m-%d %H:%M:%S')}. Please adhere to OpenAI's usage policies and guidelines. Ensure your responses are accurate, respectful, and within the scope of OpenAI's rules."}); else: messages.append({"role": "system", "content": system_message}); for val in history: if val[0] is not None: user_message = handleMultimodalData(model_name, "user", val[0]) if user_message: messages.append(user_message) if val[1] is not None: assistant_message = handleMultimodalData(model_name, "assistant", val[1]) if assistant_message: messages.append(assistant_message) if message: user_message = handleMultimodalData(model_name, "user", message) if user_message: messages.append(user_message) mode = moderate(messages) if mode: reasons = [] categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {}) for category, flagged in categories.items(): if flagged: reasons.append(category) if reasons: yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```" else: yield "[MODERATION] I'm sorry, but I can't assist with that." return async def handleResponse(completion, prefix="", didSearchedAlready=False): response = "" isRequeryNeeded = False async for token in completion: response += token['choices'][0]['delta'].get("content", token['choices'][0]['delta'].get("refusal", "")) yield f"{prefix}{response}" mode = moderate(messages + [{"role": "assistant", "content": response}]) if mode: reasons = [] categories = mode[0].get('categories', {}) if isinstance(mode, list) else mode.get('categories', {}) for category, flagged in categories.items(): if flagged: reasons.append(category) if reasons: yield "[MODERATION] I'm sorry, but I can't assist with that.\n\nReasons:\n```\n" + "\n".join([f"{i+1}. {reason}" for i, reason in enumerate(reasons)]) + "\n```" else: yield "[MODERATION] I'm sorry, but I can't assist with that." return for line in response.split('\n'): try: data = json.loads(line) if isinstance(data, dict) and data.get("tool") == "calc" and data.get("isCall") and "prompt" in data: isRequeryNeeded = True try: result = safe_eval(data["prompt"]) response = response.replace(line, f'[System: `{data["prompt"]}` === `{result}`]') except Exception as e: response = response.replace(line, f'[System: Error in calculation; `{e}`]') yield f"{prefix}{response}" elif isinstance(data, dict) and data.get("tool") == "search" and data.get("isCall") and "prompt" in data: isRequeryNeeded = True if didSearchedAlready: response = response.replace(line, f'[System: One search per response is allowed; due to how long and resource it takes; query: `{data["prompt"]}]`]') else: try: result = searchEngine(data["prompt"]) result_escaped = result.replace('`', '\\`') response = response.replace(line, f'[System: `{data["prompt"]}` ===\n```\n{result_escaped}\n```\n]') didSearchedAlready = True except Exception as e: response = response.replace(line, f'[System: Error in search function; `{e}`]') yield f"{prefix}{response}" yield f"{prefix}{response}" except (json.JSONDecodeError, AttributeError, Exception): continue if isRequeryNeeded: messages.append({"role": "assistant", "content": response}) async for res in handleResponse(streamChat({ "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "seed": (random.randint(0, 2**32) if random_seed else seed), "user": rnd(), "stream": True }), f"{prefix}{response}\n\n", didSearchedAlready): yield res async for res in handleResponse(streamChat({ "model": model_name, "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "seed": (random.randint(0, 2**32) if random_seed else seed), "user": rnd(), "stream": True })): yield res handleApiKeys();loadModels();checkModels();loadENV(); lastUpdateMessage = "Back online yet again. Removed support for image generation." demo = gr.ChatInterface( respond, title="gpt-4o-mini", description=f"A OpenAI API proxy!
View API docs [here](/api/v1/docs) [Yes you can use this as an API in a simpler manner].
[Last update: {lastUpdateMessage}] Also you can only submit images to vision models; txt/code/etc. to all models.", multimodal=True, additional_inputs=[ gr.Textbox(value="You are a helpful assistant. You are an OpenAI GPT model. Please adhere to OpenAI's usage policies and guidelines. Ensure your responses are accurate, respectful, and within the scope of OpenAI's rules.", label="System message"), gr.Dropdown(choices=models, value="gpt-4o-mini", label="Model"), gr.Slider(minimum=1, maximum=4096, value=4096, step=1, label="Max new tokens"), gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.05, label="Temperature"), gr.Slider(minimum=0.05, maximum=1.0, value=0.95, step=0.05, label="Top-p (nucleus sampling)"), gr.Slider(minimum=0, maximum=2**32, value=0, step=1, label="Seed"), gr.Checkbox(label="Randomize Seed", value=True), gr.Checkbox(label="FakeTool [Calculator beta]", value=True), gr.Checkbox(label="FakeTool [Search engine beta (Warning; each query takes up to 30 seconds)]", value=True), gr.Checkbox(label="Better system prompt (ignores the system prompt set by user.)", value=True), ], css="footer{display:none !important}", head="""""" ) app = FastAPI() @app.get("/declined") def test(): return HTMLResponse(content=""" Declined

Ok, you can go back to Hugging Face. I just didn't have any idea how to handle decline so you are redirected here.

Go back """) @app.get("/api/v1/docs") def html(): return FileResponse("index.html") app = gr.mount_gradio_app(app, demo, path="/") class ArgParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super(ArgParser, self).__init__(*args, **kwargs) self.add_argument("-s", "--server", type=str, default="") self.add_argument("-p", "--port", type=int, default=7860) self.add_argument("-d", "--dev", default=False, action="store_true") self.args = self.parse_args(sys.argv[1:]) if __name__ == "__main__": args = ArgParser().args if"__main__:app", host=args.server, port=args.port, reload=True) else:"__main__:app", host=args.server, port=args.port, reload=False)