from fastapi import FastAPI, File, UploadFile, Request, HTTPException from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from fastapi.middleware.cors import CORSMiddleware import requests import asyncio from typing import Dict import os import json import re app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) HTML_CONTENT = """ Radd PRO Uploader

Radd PRO Uploader

or drag and drop file here/paste image

Allowed file types: .zip, .mp4, .txt, .mp3, all image types, .pdf
""" @app.get("/", response_class=HTMLResponse) async def index(): return HTML_CONTENT @app.post("/upload") async def handle_upload(request: Request, file: UploadFile = File(...)): content_range = request.headers.get('Content-Range') if not content_range: raise HTTPException(status_code=400, detail="Content-Range header is missing") # Parse Content-Range header content_range_match = re.match(r'bytes (\d+)-(\d+)/(\d+)', content_range) if not content_range_match: raise HTTPException(status_code=400, detail="Invalid Content-Range header format") start_byte = int(content_range_match.group(1)) end_byte = int(content_range_match.group(2)) total_size = int(content_range_match.group(3)) if start_byte > end_byte or end_byte >= total_size: raise HTTPException(status_code=400, detail="Invalid Content-Range header values") if not file: raise HTTPException(status_code=400, detail="No file part") if file.filename == '': raise HTTPException(status_code=400, detail="No selected file") # Create temporary directory to store the chunks if it doesn't exist upload_dir = os.path.join('/tmp', 'uploads') os.makedirs(upload_dir, exist_ok=True) # Temporary file path temp_file_path = os.path.join(upload_dir, file.filename) # Path to store upload metadata meta_file_path = temp_file_path + '.json' if start_byte == 0: # Start of a new upload cookies = await get_cookies() if 'csrftoken' not in cookies or 'sessionid' not in cookies: raise HTTPException(status_code=500, detail="Failed to obtain necessary cookies") # Initiate the upload upload_result = await initiate_upload(cookies, file.filename, file.content_type) if not upload_result or 'upload_url' not in upload_result: raise HTTPException(status_code=500, detail="Failed to initiate upload") # Save the upload_url and serving_url to a metadata file with open(meta_file_path, 'w') as meta_file: json.dump(upload_result, meta_file) else: # For subsequent chunks, read the metadata file to get upload_url and serving_url if not os.path.exists(meta_file_path): raise HTTPException(status_code=400, detail="Upload metadata not found") with open(meta_file_path, 'r') as meta_file: upload_result = json.load(meta_file) # Read the chunk content file_content = await file.read() # Write the chunk to the temporary file at the correct offset with open(temp_file_path, 'ab') as f: f.seek(start_byte) f.write(file_content) # Check if the upload is complete file_size = os.path.getsize(temp_file_path) if file_size == total_size: # Read the entire file content with open(temp_file_path, 'rb') as f: full_file_content = f.read() # Upload the file to Replicate upload_success = await retry_upload(upload_result['upload_url'], full_file_content, file.content_type) if not upload_success: raise HTTPException(status_code=500, detail="Failed to upload file to Replicate") original_url = upload_result.get('serving_url') if original_url: mirrored_url = f"/rbxg/{original_url.split('/pbxt/')[1]}" # Clean up the temporary files os.remove(temp_file_path) os.remove(meta_file_path) return JSONResponse(content={"url": mirrored_url}) else: raise HTTPException(status_code=500, detail="serving_url not found") else: return JSONResponse(content={"status": "chunk uploaded"}) @app.get("/rbxg/{path:path}") async def handle_video_stream(path: str, request: Request): original_url = f'https://replicate.delivery/pbxt/{path}' range_header = request.headers.get('Range') headers = {'Range': range_header} if range_header else {} response = requests.get(original_url, headers=headers, stream=True) def generate(): for chunk in response.iter_content(chunk_size=8192): yield chunk headers = dict(response.headers) headers['Access-Control-Allow-Origin'] = '*' headers['Content-Disposition'] = 'inline' if response.status_code == 206: headers['Content-Range'] = response.headers.get('Content-Range') return StreamingResponse(generate(), status_code=response.status_code, headers=headers) @app.get("/embed") async def embed_video(url: str, thumbnail: str): html = f''' ''' return HTMLResponse(content=html) async def get_cookies() -> Dict[str, str]: try: response = requests.get('https://replicate.com/levelsio/neon-tokyo', headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)' }) return dict(response.cookies) except Exception as e: print(f'Error fetching the page: {e}') return {} async def initiate_upload(cookies: Dict[str, str], filename: str, content_type: str) -> Dict: url = f'https://replicate.com/api/upload/{filename}?content_type={content_type}' try: headers = { 'X-CSRFToken': cookies.get('csrftoken'), 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', 'Referer': 'https://replicate.com/levelsio/neon-tokyo', 'Origin': 'https://replicate.com', 'Accept': '*/*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'identity', 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin' } response = requests.post(url, cookies=cookies, headers=headers) return response.json() except Exception as e: print(f'Error initiating upload: {e}') raise async def upload_file(upload_url: str, file_content: bytes, content_type: str) -> bool: try: headers = { 'Content-Type': content_type } response = requests.put(upload_url, data=file_content, headers=headers) return response.status_code in [200, 201, 204] except Exception as e: print(f'Error uploading file: {e}') return False async def retry_upload(upload_url: str, file_content: bytes, content_type: str, max_retries: int = 5, delay: int = 1) -> bool: retries = 0 while retries < max_retries: success = await upload_file(upload_url, file_content, content_type) if success: return True else: print(f"Upload attempt {retries + 1} failed. Retrying...") retries += 1 await asyncio.sleep(delay) delay = min(delay * 2, 60) # Exponential backoff return False