gemma2_9b_7gb / fn.py
aka7774's picture
Upload fn.py
e75b8ce verified
raw
history blame contribute delete
No virus
5.82 kB
import os
import torch
import json
import gc
import time
from unsloth import FastLanguageModel
from transformers import TextIteratorStreamer
from threading import Thread
os.environ["TOKENIZERS_PARALLELISM"] = "false"
tokenizer = None
model = None
default_cfg = {
'model_name': "unsloth/gemma-2-9b-it-bnb-4bit",
'dtype': None,
'instruction': None,
'inst_template': None,
'chat_template': None,
'max_length': 2400,
'max_seq_length': 2048,
'max_new_tokens': 512,
'temperature': 0.9,
'top_p': 0.95,
'top_k': 40,
'repetition_penalty': 1.2,
}
cfg = default_cfg.copy()
def load_model(model_name, dtype):
global tokenizer, model, cfg
if cfg['model_name'] == model_name and cfg['dtype'] == dtype:
return
del model
del tokenizer
model = None
tokenizer = None
gc.collect()
torch.cuda.empty_cache()
model, tokenizer = FastLanguageModel.from_pretrained(
model_name,
max_seq_length = cfg['max_seq_length'],
dtype = torch.bfloat16,
load_in_8bit = (dtype == '8bit'),
load_in_4bit = (dtype == '4bit'),
)
FastLanguageModel.for_inference(model)
cfg['model_name'] = model_name
cfg['dtype'] = dtype
def clear_config():
global cfg
cfg = default_cfg.copy()
def set_config(model_name, dtype, instruction, inst_template, chat_template, max_new_tokens, temperature, top_p, top_k, repetition_penalty):
global cfg
load_model(model_name, dtype)
cfg.update({
'instruction': instruction,
'inst_template': inst_template,
'chat_template': chat_template,
'max_new_tokens': int(max_new_tokens),
'temperature': float(temperature),
'top_p': float(top_p),
'top_k': int(top_k),
'repetition_penalty': float(repetition_penalty),
})
return 'done.'
def set_config_args(args):
global cfg
load_model(args['model_name'], args['dtype'])
cfg.update(args)
return 'done.'
def chatinterface_to_messages(message, history):
global cfg
messages = []
if cfg['instruction']:
messages.append({'role': 'user', 'content': cfg['instruction']})
messages.append({'role': 'assistant', 'content': 'I understand.'})
for pair in history:
[user, assistant] = pair
if user:
messages.append({'role': 'user', 'content': user})
if assistant:
messages.append({'role': 'assistant', 'content': assistant})
if message:
messages.append({'role': 'user', 'content': message})
return messages
def apply_template(message, history, args):
global tokenizer, cfg
if 'input' in args:
message = args['input']
if 'instruction' in args:
cfg['instruction'] = args['instruction']
if 'messages' in args:
messages = args['messages']
elif history:
messages = chatinterface_to_messages(message, history)
else:
messages = {}
if cfg['chat_template']:
tokenizer.chat_template = cfg['chat_template']
if message:
if cfg['inst_template']:
return cfg['inst_template'].format(instruction=cfg['instruction'], input=message)
if cfg['instruction']:
messages = [
{'role': 'user', 'content': cfg['instruction']},
{'role': 'assistant', 'content': 'I understand.'},
{'role': 'user', 'content': message},
]
else:
messages = [
{'role': 'user', 'content': message},
]
return tokenizer.apply_chat_template(conversation=messages, add_generation_prompt=True, tokenize=False)
def chat(message = None, history = [], args = {}):
global tokenizer, model, cfg
prompt = apply_template(message, history, args)
inputs = tokenizer(prompt, return_tensors="pt",
padding=True, max_length=cfg['max_length'], truncation=True).to("cuda")
streamer = TextIteratorStreamer(
tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True,
)
generate_kwargs = dict(
inputs,
do_sample=True,
streamer=streamer,
num_beams=1,
)
for k in [
'max_new_tokens',
'temperature',
'top_p',
'top_k',
'repetition_penalty'
]:
if cfg[k]:
generate_kwargs[k] = cfg[k]
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
model_output = ""
for new_text in streamer:
model_output += new_text
if 'fastapi' in args:
# fastapiは差分だけを返して欲しい
yield new_text
else:
# gradioは常に全文を返して欲しい
yield model_output
def infer(message = None, history = [], args = {}):
global tokenizer, model, cfg
prompt = apply_template(message, history, args)
inputs = tokenizer(prompt, return_tensors="pt",
padding=True, max_length=cfg['max_length'], truncation=True).to("cuda")
generate_kwargs = dict(
inputs,
do_sample=True,
num_beams=1,
use_cache=True,
)
for k in [
'max_new_tokens',
'temperature',
'top_p',
'top_k',
'repetition_penalty'
]:
if cfg[k]:
generate_kwargs[k] = cfg[k]
output_ids = model.generate(**generate_kwargs)
return tokenizer.decode(output_ids.tolist()[0][inputs['input_ids'].size(1):], skip_special_tokens=True)
def numel(message = None, history = [], args = {}):
global tokenizer, model, cfg
prompt = apply_template(message, history, args)
model_inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
return torch.numel(model_inputs['input_ids'])
load_model(cfg['model_name'], '4bit')