import gradio as gr import json import posixpath from fastapi import HTTPException, Path, Query, Request from fastapi.responses import StreamingResponse from gradio_huggingfacehub_search import HuggingfaceHubSearch from huggingface_hub import HfApi, HfFileSystem from typing import Annotated, Any, NamedTuple from urllib.parse import urlencode from _hf_explorer import FileExplorer from _hf_gguf import standard_metadata, TokenType, LlamaFileType, GGUFValueType, HuggingGGUFstream hfapi = HfApi() class MetadataState(NamedTuple): var: dict[str, Any] key: dict[str, tuple[int, Any]] add: dict[str, Any] rem: set def init_state( ): return MetadataState( var = {}, key = {}, add = {}, rem = set(), ) def human_readable_metadata( meta: MetadataState, key: str, typ: int, val: Any, ) -> tuple[str, str, Any]: typ = GGUFValueType(typ).name if typ == 'ARRAY': val = '[[...], ...]' elif isinstance(val, list): typ = f'[{typ}][{len(val)}]' if len(val) > 8: val = str(val[:8])[:-1] + ', ...]' else: val = str(val) elif isinstance(val, dict): val = '[' + ', '.join((f'{k}: {v}' for k, v in val.items())) + ']' elif key == 'general.file_type': try: ftype = LlamaFileType(val).name except: ftype = 'UNKNOWN' val = f'{ftype} ({val})' elif key.endswith('_token_id'): tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] if isinstance(val, int) and val >= 0 and val < len(tokens): val = f'{tokens[val]} ({val})' return key, typ, val with gr.Blocks( ) as blocks: with gr.Tab("Editor"): with gr.Row(): hf_search = HuggingfaceHubSearch( label = "Search Huggingface Hub", placeholder = "Search for models on Huggingface", search_type = "model", sumbit_on_select = True, scale = 2, ) hf_branch = gr.Dropdown( None, label = "Branch", scale = 1, ) gr.LoginButton( "Sign in to access gated/private repos", scale = 1, ) hf_file = FileExplorer( visible=False, ) with gr.Row(): with gr.Column(): meta_keys = gr.Dropdown( None, label = "Modify Metadata", info = "Search by metadata key name", allow_custom_value = True, visible = False, ) with gr.Column(): meta_types = gr.Dropdown( [e.name for e in GGUFValueType], label = "Metadata Type", info = "Select data type", type = "index", visible = False, ) with gr.Column(): btn_delete = gr.Button( "Remove Key", variant = "stop", visible = False, ) meta_boolean = gr.Checkbox( label = "Boolean", info = "Click to update value", visible = False, ) with gr.Row(): meta_token_select = gr.Dropdown( label = "Select token", info = "Search by token name", type = "index", allow_custom_value = True, visible = False, ) meta_token_type = gr.Dropdown( [e.name for e in TokenType], label = "Token type", info = "Select token type", type = "index", visible = False, ) meta_lookup = gr.Dropdown( label = "Lookup token", info = "Search by token name", type = "index", allow_custom_value = True, visible = False, ) meta_number = gr.Number( label = "Number", info = "Enter to update value", visible = False, ) meta_string = gr.Textbox( label = "String", info = "Enter to update value (Shift+Enter for new line)", visible = False, ) meta_array = gr.Matrix( None, label = "Unsupported", row_count = (1, "fixed"), height = "1rem", interactive = False, visible = False, ) meta_changes = gr.HighlightedText( None, label = "Metadata Changes", color_map = {"add": "green", "rem": "red"}, interactive = False, visible = False, ) btn_download = gr.Button( "Download GGUF", variant = "primary", visible = False, ) file_meta = gr.Matrix( None, col_count = (3, "fixed"), headers = [ "Metadata Name", "Type", "Value", ], datatype = ["str", "str", "str"], column_widths = ["35%", "15%", "50%"], wrap = True, interactive = False, visible = False, ) with gr.Tab("Help"): gr.Markdown( """# Huggingface GGUF Editor An advanced GGUF editor, reading GGUF files directly from Huggingface repositories and applying changes to your own copies. Below you will find a collection of example use-cases to show you how to perform a few common GGUF editing operations: """, ) with gr.Column(render = False) as example_group: example_description = gr.Markdown( visible = False, ) with gr.Row(): with gr.Column(): example_keys = gr.Dropdown( allow_custom_value = True, visible = False, ) with gr.Column(): example_types = gr.Dropdown( allow_custom_value = True, visible = False, ) with gr.Column(): example_delete = gr.Button( interactive = False, visible = False, ) example_boolean = gr.Checkbox( visible = False, ) with gr.Row(): example_token_select = gr.Dropdown( allow_custom_value = True, visible = False, ) example_token_type = gr.Dropdown( allow_custom_value = True, visible = False, ) example_number = gr.Number( visible = False, ) example_string = gr.Textbox( visible = False, ) example_components = [ example_description, example_keys, example_types, example_delete, example_boolean, example_token_select, example_token_type, example_number, example_string, ] example_defaults = { example_description: dict( value = "", visible = False, ), example_keys: dict( value = "", label = meta_keys.label, info = "Select this metadata key", visible = False, ), example_types: dict( value = "", label = meta_types.label, info = "This will have the correct type set automatically", visible = False, ), example_delete: dict( value = btn_delete.value, variant = btn_delete.variant, visible = False, ), example_boolean: dict( value = False, label = meta_boolean.label, info = "", visible = False, ), example_token_select: dict( value = "", label = meta_token_select.label, visible = False, ), example_token_type: dict( value = "", label = meta_token_type.label, visible = False, ), example_number: dict( value = 0, precision = 0, label = meta_number.label, info = "", visible = False, ), example_string: dict( value = "", label = meta_string.label, info = "", visible = False, ), } example_properties = [ dict( label = 'Fix "missing pre-tokenizer type" warning', outputs = { example_description: dict( value = """## Fixing Pre-Tokenizer warning Custom Pre-Tokenization was added to `llama.cpp` April 29th 2024, and since then basically every model using BPE tokenization need support added to `llama.cpp` to work correctly. Models converted using the conversion script before the support for this specific model was added will either be missing the pre-tokenizer metadata or be set incorrectly to `default`. See the models list in [llama.cpp/convert_hf_to_gguf_update.py](https://github.com/ggerganov/llama.cpp/blob/master/convert_hf_to_gguf_update.py#L67) to find out which pre-tokenizer to choose. Setting the correct pre-tokenizer is often enough to fix the model's tokenizer, however if it has been quantized using an `imatrix` it should be re-quantized for best performance. Removing this metadata key from a model will cause `llama.cpp` to output a warning if BPE tokenization is used, it currently has no effect on any other tokenizers. """, visible = True, ), example_keys: dict( value = "tokenizer.ggml.pre", visible = True, ), example_types: dict( value = GGUFValueType.STRING.name, visible = True, ), example_delete: dict( visible = True, ), example_string: dict( info = "Fill in pre-tokenizer name, can be f.ex. deepseek-llm, command-r, tekken, etc. you will need to do some research to find the correct one", value = "llama-bpe", visible = True, ), }, ), dict( label = "Add missing (Fill-in-Middle, EOT, etc) or change incorrect (BOS, EOS, etc) tokens", outputs = { example_description: dict( value = """## Add missing/change incorrect tokens Sometimes converted models will be missing declarations of important tokens like EOT, Fill-in-Middle (prefix, suffix, middle) for various reasons. Other times they may have the incorrect tokens set as BOS, EOS, etc. Either way, missing or incorrectly declared tokens means inference will not work as expected. Token declaration is made with the metadata key(s) named "tokenizer.ggml.`token name`\_token\_id" which contains the ID (index number) of the token in the token list (`tokenizer.ggml.tokens`). A recurring issue is misconfigured EOS/EOT/EOM tokens, the need to set each of these and what they should be will vary between models, but the effect when these are incorrect is usually the same; infinte generation responses, ie. inference does not know when to stop. Typically this would be because f.ex. EOS has been set to <|endoftext|> instead of <|im\_end|> (again, model specific, just an example). Another issue, mainly for code models, is that Fill-in-Middle tokens have not been declared (note; not all models have or use such tokens), causing sub-par results for filling in blanks in code/text. There are 3 main metadata keys that need to be present for this; tokenizer.ggml.`prefix`\_token\_id, `suffix` and `middle`, sometimes also EOT/EOM if it differs from EOS in this mode. They are usually named fim\_`something` or just `PRE`, `SUF` and `MID`, take extra care with DeepSeek-based models where prefix is (...fim...)`begin`, suffix is `hole` and middle is `end`. """, visible = True, ), example_keys: dict( value = "tokenizer.ggml.prefix_token_id", info = "Select or enter any metadata key ending with _token_id", visible = True, ), example_types: dict( value = GGUFValueType.UINT32.name, visible = True, ), example_token_select: dict( value = "", label = meta_lookup.label, info = "You can search for the correct token by parts of its name here, then select the correct one from the list of options", visible = True, ), example_number: dict( value = 92295, info = "The token ID will be automatically filled in when you select the token, but you can also fill in the ID directly", visible = True, ), }, ), dict( label = "Setting the correct token type for a token", outputs = { example_description: dict( value = """## Changing a token's type A common issue is not declaring special control tokens as such, leading to bad tokenization of them when used (usually in the chat template), causing poor responses from the model. Take f.ex. a model with an incorrectly configured <|im\_start|> token as a normal token instead of a special control token, given the following prompt: ``` <|im_start|>Hello World<|im_end|> ``` This prompt would then be incorrectly tokenized as follows: ``` 27 ('<') 91 ('|') 318 ('im') 4906 ('_start') 91 ('|') 29 ('>') 9707 ('Hello') 4337 (' World') 151645 ('<|im_end|>') ``` instead of: ``` 151644 ('<|im_start|>') 9707 ('Hello') 4337 (' World') 151645 ('<|im_end|>') ``` Take care to also adjust the value for this token in `tokenizer.ggml.scores` (if it exists) similarly to other special control tokens. **WARNING**: Even though you have the option to, you should never remove the `tokenizer.ggml.token_type` key! """, visible = True, ), example_keys: dict( value = "tokenizer.ggml.token_type", visible = True, ), example_types: dict( value = GGUFValueType.INT32.name, visible = True, ), example_delete: dict( visible = True, ), example_token_select: dict( value = "<|im_start|>", info = "You can search for the token by parts of its name here, then select it from the list of options", visible = True, ), example_token_type: dict( value = TokenType.CONTROL.name, info = "Select the appropriate token type, in this case we set it as a special control token", visible = True, ), }, ), dict( label = "Updating or adding a chat template", outputs = { example_description: dict( value = """## Modifying the Chat Template The chat template is a very important part of the model metadata as this provides a template for how to format the conversation prompt to the model. It's not uncommon for these to have bugs (or sometimes just be plain wrong), requiring you to update them to be able to prompt the model correctly. It's also possible to have multiple chat templates for different purposes, the main ones being RAG and Tools, but you can create any additional template you want. The standard metadata key for RAG is `tokenizer.chat_template.rag` and Tools is `tokenizer.chat_template.tool_use`, any metadata key added starting with `tokenizer.chat_template.` will be added as a custom chat template. Any framework based on `llama-cpp-python` will let you select which chat template to use with the `chat_format` option, available as `chat_template.default`, `chat_template.rag`, `chat_template.tool_use`, etc... """, visible = True, ), example_keys: dict( value = "tokenizer.chat_template", info = 'Select this or enter any key starting with "tokenizer.chat_template."', visible = True, ), example_types: dict( value = GGUFValueType.STRING.name, visible = True, ), example_delete: dict( visible = True, ), example_string: dict( info = "Paste in the updated chat template or make changes here. Using an external Jinja2 editor is recommended", value = "{%- for message in messages %}\n {{- '<|' + message['role'] + '|>\\n' }}\n {{- message['content'] + eos_token }}\n{%- endfor %}\n{%- if add_generation_prompt %}\n {{- '<|assistant|>\\n' }}\n{%- endif %}", visible = True, ), }, ), ] examples = gr.Dataset( label = "Choose an example", type = "index", samples = [[]] * len(example_properties), sample_labels = [x["label"] for x in example_properties], ) @gr.on( triggers = [ examples.click, ], inputs = [ examples, ], outputs = [ ] + example_components, show_progress = "hidden", ) def show_example( value: int, ): outputs = example_properties[value]["outputs"] non_outputs = example_components - outputs.keys() all_outputs = dict(((k, type(k)(**(example_defaults[k] | v))) for k, v in outputs.items())) for output in non_outputs: all_outputs[output] = type(output)(**example_defaults[output]) return all_outputs for k, v in example_defaults.items(): for prop, val in v.items(): setattr(k, prop, val) example_group.render() meta_state = gr.State() # init_state # BUG: For some reason using gr.State initial value turns tuple to list? meta_state.value = init_state() token_select_indices = gr.State([]) file_change_components = [ meta_changes, file_meta, meta_keys, btn_download, ] state_change_components = [ meta_state, ] + file_change_components @gr.on( triggers = [ hf_search.submit, ], inputs = [ hf_search, ], outputs = [ hf_branch, ], ) def get_branches( repo: str, oauth_token: gr.OAuthToken | None = None, ): branches = [] try: refs = hfapi.list_repo_refs( repo, token = oauth_token.token if oauth_token else False, ) branches = [b.name for b in refs.branches] except Exception as e: raise gr.Error(e) return { hf_branch: gr.Dropdown( branches or None, value = "main" if "main" in branches else None, ), } @gr.on( triggers = [ hf_search.submit, hf_branch.input, ], inputs = [ hf_search, hf_branch, ], outputs = [ hf_file, ] + file_change_components, ) def get_files( repo: str, branch: str | None, oauth_token: gr.OAuthToken | None = None, ): return { hf_file: FileExplorer( "**/*.gguf", file_count = "single", root_dir = repo, branch = branch, token = oauth_token.token if oauth_token else None, visible = True, ), meta_changes: gr.HighlightedText( None, visible = False, ), file_meta: gr.Matrix( # None, # FIXME (see Dataframe bug below) visible = False, ), meta_keys: gr.Dropdown( None, visible = False, ), btn_download: gr.Button( visible = False, ), } @gr.on( triggers = [ hf_file.change, ], inputs = [ hf_file, hf_branch, ], outputs = [ meta_state, ] + file_change_components, show_progress = 'minimal', ) def load_metadata( repo_file: str | None, branch: str | None, progress: gr.Progress = gr.Progress(), oauth_token: gr.OAuthToken | None = None, ): m = [] meta = init_state() yield { meta_state: meta, file_meta: gr.Matrix( [['', '', '']] * 100, # FIXME: Workaround for Dataframe bug when user has selected data visible = True, ), meta_changes: gr.HighlightedText( None, visible = False, ), meta_keys: gr.Dropdown( None, visible = False, ), btn_download: gr.Button( visible = False, ), } if not repo_file: return fs = HfFileSystem( token = oauth_token.token if oauth_token else None, ) try: progress(0, desc = 'Loading file...') with fs.open( repo_file, "rb", revision = branch, block_size = 8 * 1024 * 1024, cache_type = "readahead", ) as fp: progress(0, desc = 'Reading header...') gguf = HuggingGGUFstream(fp) num_metadata = gguf.header['metadata'].value metadata = gguf.read_metadata() meta.var['repo_file'] = repo_file meta.var['branch'] = branch deferred_updates = [] for k, v in progress.tqdm(metadata, desc = 'Reading metadata...', total = num_metadata, unit = f' of {num_metadata} metadata keys...'): human = [*human_readable_metadata(meta, k, v.type, v.value)] if k.endswith('_token_id') and 'tokenizer.ggml.tokens' not in meta.key: deferred_updates.append(((k, v.type, v.value), human)) m.append(human) meta.key[k] = (v.type, v.value) # FIXME # yield { # file_meta: gr.Matrix( # m, # ), # } for data, human in deferred_updates: human[:] = human_readable_metadata(meta, *data) except Exception as e: raise gr.Error(e) yield { meta_state: meta, file_meta: gr.Matrix( m, ), meta_keys: gr.Dropdown( sorted(meta.key.keys() | standard_metadata.keys()), value = '', visible = True, ), } @gr.on( triggers = [ meta_keys.change, ], inputs = [ meta_state, meta_keys, ], outputs = [ meta_types, btn_delete, ], ) def update_metakey( meta: MetadataState, key: str | None, ): typ = None if (val := meta.key.get(key, standard_metadata.get(key))) is not None: typ = GGUFValueType(val[0]).name elif key: if key.startswith('tokenizer.chat_template.'): typ = GGUFValueType.STRING.name elif key.endswith('_token_id'): typ = GGUFValueType.UINT32.name return { meta_types: gr.Dropdown( value = typ, interactive = False if typ is not None else True, visible = True if key else False, ), btn_delete: gr.Button( visible = True if key in meta.key else False, ), } @gr.on( triggers = [ meta_keys.change, meta_types.input, ], inputs = [ meta_state, meta_keys, meta_types, ], outputs = [ meta_boolean, meta_token_select, meta_token_type, meta_lookup, meta_number, meta_string, meta_array, ], ) def update_metatype( meta: MetadataState, key: str, typ: int, ): val = None tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] if (data := meta.key.get(key, standard_metadata.get(key))) is not None: typ = data[0] val = data[1] elif not key: typ = None do_select_token = False do_lookup_token = False do_token_type = False match key: case 'tokenizer.ggml.scores': do_select_token = True case 'tokenizer.ggml.token_type': do_select_token = True do_token_type = True case s if s.endswith('_token_id'): do_lookup_token = True case _: pass if isinstance(val, list) and not do_select_token: # TODO: Support arrays? typ = GGUFValueType.ARRAY match typ: case GGUFValueType.INT8 | GGUFValueType.INT16 | GGUFValueType.INT32 | GGUFValueType.INT64 | GGUFValueType.UINT8 | GGUFValueType.UINT16 | GGUFValueType.UINT32 | GGUFValueType.UINT64 | GGUFValueType.FLOAT32 | GGUFValueType.FLOAT64: is_number = True case _: is_number = False return { meta_boolean: gr.Checkbox( value = val if typ == GGUFValueType.BOOL and data is not None else False, visible = True if typ == GGUFValueType.BOOL else False, ), meta_token_select: gr.Dropdown( None, value = '', visible = True if do_select_token else False, ), meta_token_type: gr.Dropdown( interactive = False, visible = True if do_token_type else False, ), meta_lookup: gr.Dropdown( None, value = tokens[val] if is_number and data is not None and do_lookup_token and val < len(tokens) else '', visible = True if is_number and do_lookup_token else False, ), meta_number: gr.Number( value = val if is_number and data is not None and not do_select_token else 0, precision = 10 if typ == GGUFValueType.FLOAT32 or typ == GGUFValueType.FLOAT64 else 0, interactive = False if do_select_token else True, visible = True if is_number and not do_token_type else False, ), meta_string: gr.Textbox( value = val if typ == GGUFValueType.STRING else '', visible = True if typ == GGUFValueType.STRING else False, ), meta_array: gr.Matrix( visible = True if typ == GGUFValueType.ARRAY else False, ), } # FIXME: Disabled for now due to Dataframe bug when user has selected data # @gr.on( # triggers = [ # file_meta.select, # ], # inputs = [ # ], # outputs = [ # meta_keys, # ], # ) # def select_metakey( # evt: gr.SelectData, # ): # return { # meta_keys: gr.Dropdown( # value = evt.row_value[0] if evt.selected else '', # ), # } def notify_state_change( meta: MetadataState, request: gr.Request, ): changes = [(k, 'rem') for k in meta.rem] for k, v in meta.add.items(): key, typ, val = human_readable_metadata(meta, k, *v) changes.append((k, 'add')) changes.append((str(val), None)) m = [] for k, v in meta.key.items(): m.append([*human_readable_metadata(meta, k, v[0], v[1])]) link = str(request.request.url_for('download', repo_file = meta.var['repo_file']).include_query_params(branch = meta.var['branch'])) if link.startswith('http:'): link = 'https' + link[4:] if meta.rem or meta.add: link += '&' + urlencode( { 'rem': meta.rem, 'add': [json.dumps([k, *v], ensure_ascii = False, separators = (',', ':')) for k, v in meta.add.items()], }, doseq = True, safe = '[]{}:"\',', ) return { meta_state: meta, meta_changes: gr.HighlightedText( changes, visible = True if changes else False, ), file_meta: gr.Matrix( m, ), meta_keys: gr.Dropdown( sorted(meta.key.keys() | standard_metadata.keys()), value = '', ), btn_download: gr.Button( link = link, visible = True if changes else False, ), } @gr.on( triggers = [ btn_delete.click, ], inputs = [ meta_state, meta_keys, ], outputs = [ ] + state_change_components, ) def rem_metadata( meta: MetadataState, key: str, request: gr.Request, ): if key in meta.add: del meta.add[key] if key in meta.key: del meta.key[key] meta.rem.add(key) return notify_state_change( meta, request, ) def token_search( meta: MetadataState, name: str, ): found = {} name = name.lower() tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] any(((len(found) > 5, found.setdefault(i, t))[0] for i, t in enumerate(tokens) if name in t.lower())) return found @gr.on( triggers = [ meta_token_select.key_up, ], inputs = [ meta_state, ], outputs = [ meta_token_select, token_select_indices, ], show_progress = 'hidden', trigger_mode = 'always_last', ) def token_select( meta: MetadataState, keyup: gr.KeyUpData, ): found = token_search(meta, keyup.input_value) return { meta_token_select: gr.Dropdown( list(found.values()), ), token_select_indices: list(found.keys()), } @gr.on( triggers = [ meta_token_select.input, ], inputs = [ meta_state, meta_keys, meta_token_select, token_select_indices, ], outputs = [ meta_token_type, meta_number, ], ) def token_selected( meta: MetadataState, key: str, choice: int | None, indices: list[int], ): if choice is None or choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: raise gr.Error('Token not found') tokens = meta.key.get('tokenizer.ggml.tokens', (-1, []))[1] if token >= len(tokens): raise gr.Error('Invalid token') data = meta.key.get(key, (-1, []))[1] match key: case 'tokenizer.ggml.scores': return { meta_number: gr.Number( value = data[token] if data and len(data) > token else 0.0, interactive = True, ), } case 'tokenizer.ggml.token_type': return { meta_token_type: gr.Dropdown( value = TokenType(data[token]).name if data and len(data) > token else TokenType.NORMAL.name, interactive = True, ), } case _: raise gr.Error('Invalid metadata key') @gr.on( triggers = [ meta_lookup.key_up, ], inputs = [ meta_state, ], outputs = [ meta_lookup, token_select_indices, ], show_progress = 'hidden', trigger_mode = 'always_last', ) def token_lookup( meta: MetadataState, keyup: gr.KeyUpData, ): found = token_search(meta, keyup.input_value) return { meta_lookup: gr.Dropdown( list(found.values()), ), token_select_indices: list(found.keys()), } def add_metadata( meta: MetadataState, key: str, typ: int | None, val: Any, request: gr.Request, choice: int | None = None, indices: list[int] | None = None, ): if not key or typ is None: if key: gr.Warning('Missing required value type') return { meta_changes: gr.HighlightedText( ), } if key in meta.rem: meta.rem.remove(key) match key: case 'tokenizer.ggml.scores' | 'tokenizer.ggml.token_type': if choice is None or choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: raise gr.Error('Token not found') tok = meta.add.setdefault(key, (typ, {}))[1] tok[str(token)] = val + 1 if key == 'tokenizer.ggml.token_type' else val data = meta.key.setdefault(key, (typ, [0.0 if key == 'tokenizer.ggml.scores' else int(TokenType.NORMAL)] * len(meta.key.get('tokenizer.ggml.tokens', (-1, []))[1])))[1] if data: for k, v in tok.items(): data[int(k)] = v case _: meta.key[key] = meta.add[key] = (typ, val) if key.startswith('tokenizer.chat_template.'): template = key[24:] if template not in meta.key.get('tokenizer.chat_templates', []): templates = [x[24:] for x in meta.key.keys() if x.startswith('tokenizer.chat_template.')] meta.key['tokenizer.chat_templates'] = meta.add['tokenizer.chat_templates'] = (GGUFValueType.STRING, templates) return notify_state_change( meta, request, ) def token_select_to_id( choice: int, indices: list[int], ): if choice < 0 or choice >= len(indices) or (token := indices[choice]) < 0: raise gr.Error('Token not found') return { meta_number: gr.Number( token, ), } meta_lookup.input( token_select_to_id, inputs = [ meta_lookup, token_select_indices, ], outputs = [ meta_number, ], ).success( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_number, ], outputs = [ ] + state_change_components, ) meta_boolean.input( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_boolean, ], outputs = [ ] + state_change_components, ) meta_token_type.input( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_token_type, meta_token_select, token_select_indices, ], outputs = [ ] + state_change_components, ) meta_number.submit( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_number, meta_token_select, token_select_indices, ], outputs = [ ] + state_change_components, ) meta_string.submit( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_string, ], outputs = [ ] + state_change_components, ) meta_array.input( add_metadata, inputs = [ meta_state, meta_keys, meta_types, meta_array, ], outputs = [ ] + state_change_components, ) def stream_repo_file( repo_file: str, branch: str, add_meta: list[str] | None, rem_meta: list[str] | None, token: str | None = None, ): fs = HfFileSystem( token = token, ) with fs.open( repo_file, "rb", revision = branch, block_size = 8 * 1024 * 1024, cache_type = "readahead", ) as fp: if not rem_meta: rem_meta = [] if not add_meta: add_meta = [] gguf = HuggingGGUFstream(fp) for _ in gguf.read_metadata(): pass for k in rem_meta: gguf.remove_metadata(k) tokens = gguf.metadata.get('tokenizer.ggml.tokens') for k in add_meta: k = json.loads(k) if isinstance(k, list) and len(k) == 3: if isinstance(k[2], dict): if tokens: if (data := gguf.metadata.get(k[0])): data = data.value else: data = [0.0 if k[0] == 'tokenizer.ggml.scores' else int(TokenType.NORMAL)] * len(tokens.value) for i, v in k[2].items(): data[int(i)] = v k[2] = data else: k[2] = [] gguf.add_metadata(*k) gguf.adjust_padding() yield gguf.filesize yield b''.join((v.data for k, v in gguf.header.items())) for k, v in gguf.metadata.items(): yield v.data while True: if not (data := fp.read(65536)): break yield data if __name__ == "__main__": blocks.queue( max_size = 10, default_concurrency_limit = 10, ) app, local_url, share_url = blocks.launch( show_api = False, prevent_thread_lock = True, ) async def download( request: Request, repo_file: Annotated[str, Path()], branch: Annotated[str, Query()] = "main", add: Annotated[list[str] | None, Query()] = None, rem: Annotated[list[str] | None, Query()] = None, ): token = request.session.get('oauth_info', {}).get('access_token') if posixpath.normpath(repo_file) != repo_file or '\\' in repo_file or repo_file.startswith('../') or repo_file.startswith('/') or repo_file.count('/') < 2: raise HTTPException( status_code = 404, detail = 'Invalid repository', ) stream = stream_repo_file( repo_file, branch, add, rem, token = token, ) size = next(stream) return StreamingResponse( stream, headers = { 'Content-Length': str(size), }, media_type = 'application/octet-stream', ) app.add_api_route( "/download/{repo_file:path}", download, methods = ["GET"], ) # app.openapi_schema = None # app.setup() blocks.block_thread()