from transformers import pipeline from transformers import AutoTokenizer from transformers import AutoModelForSeq2SeqLM import streamlit as st import fitz # PyMuPDF from docx import Document import re import nltk from presidio_analyzer import AnalyzerEngine, PatternRecognizer, RecognizerResult, Pattern nltk.download('punkt') def sentence_tokenize(text): sentences = nltk.sent_tokenize(text) return sentences model_dir_large = 'edithram23/Redaction_Personal_info_v1' tokenizer_large = AutoTokenizer.from_pretrained(model_dir_large) model_large = AutoModelForSeq2SeqLM.from_pretrained(model_dir_large) pipe1 = pipeline("token-classification", model="edithram23/new-bert-v2") # model_dir_small = 'edithram23/Redaction' # tokenizer_small = AutoTokenizer.from_pretrained(model_dir_small) # model_small = AutoModelForSeq2SeqLM.from_pretrained(model_dir_small) # def small(text, model=model_small, tokenizer=tokenizer_small): # inputs = ["Mask Generation: " + text.lower() + '.'] # inputs = tokenizer(inputs, max_length=256, truncation=True, return_tensors="pt") # output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) # decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] # predicted_title = decoded_output.strip() # pattern = r'\[.*?\]' # redacted_text = re.sub(pattern, '[redacted]', predicted_title) # return redacted_text # Initialize the analyzer engine analyzer = AnalyzerEngine() # Define a custom address recognizer using a regex pattern address_pattern = Pattern(name="address", regex=r"\d+\s\w+\s(?:street|st|road|rd|avenue|ave|lane|ln|drive|dr|blvd|boulevard)\s*\w*", score=0.5) address_recognizer = PatternRecognizer(supported_entity="ADDRESS", patterns=[address_pattern]) # Add the custom address recognizer to the analyzer analyzer.registry.add_recognizer(address_recognizer) analyzer.get_recognizers # Define a function to extract entities def combine_words(entities): combined_entities = [] current_entity = None for entity in entities: if current_entity: if current_entity['end'] == entity['start']: # Combine the words without space current_entity['word'] += entity['word'].replace('##', '') current_entity['end'] = entity['end'] elif current_entity['end'] + 1 == entity['start']: # Combine the words with a space current_entity['word'] += ' ' + entity['word'].replace('##', '') current_entity['end'] = entity['end'] else: # Add the previous combined entity to the list combined_entities.append(current_entity) # Start a new entity current_entity = entity.copy() current_entity['word'] = current_entity['word'].replace('##', '') else: # Initialize the first entity current_entity = entity.copy() current_entity['word'] = current_entity['word'].replace('##', '') # Add the last entity if current_entity: combined_entities.append(current_entity) return combined_entities def words_red_bert(text): final=[] sentences = sentence_tokenize(text) for sentence in sentences: x=[pipe1(sentence)] m = combine_words(x[0]) for j in m: if(j['entity']!='none' and len(j['word'])>1 and j['word']!=', '): final.append(j['word']) return final def extract_entities(text): entities = { "NAME": [], "PHONE_NUMBER": [], "EMAIL": [], "ADDRESS": [], "LOCATION": [], "IN_AADHAAR": [], } output = [] # Analyze the text for PII results = analyzer.analyze(text=text, language='en') for result in results: if result.entity_type == "PERSON": entities["NAME"].append(text[result.start:result.end]) output+=[text[result.start:result.end]] elif result.entity_type == "PHONE_NUMBER": entities["PHONE_NUMBER"].append(text[result.start:result.end]) output+=[text[result.start:result.end]] elif result.entity_type == "EMAIL_ADDRESS": entities["EMAIL"].append(text[result.start:result.end]) output+=[text[result.start:result.end]] elif result.entity_type == "ADDRESS": entities["ADDRESS"].append(text[result.start:result.end]) output+=[text[result.start:result.end]] elif result.entity_type == 'LOCATION': entities['LOCATION'].append(text[result.start:result.end]) output+=[text[result.start:result.end]] elif result.entity_type == 'IN_AADHAAR': entities['IN_PAN'].append(text[result.start:result.end]) output+=[text[result.start:result.end]] return entities,output def mask_generation(text, model=model_large, tokenizer=tokenizer_large): if len(text) < 90: text = text + '.' # return small(text) inputs = ["Mask Generation: " + text.lower() + '.'] inputs = tokenizer(inputs, max_length=512, truncation=True, return_tensors="pt") output = model.generate(**inputs, num_beams=8, do_sample=True, max_length=len(text)) decoded_output = tokenizer.batch_decode(output, skip_special_tokens=True)[0] predicted_title = decoded_output.strip() pattern = r'\[.*?\]' redacted_text = re.sub(pattern, '[redacted]', predicted_title) return redacted_text def redact_text(page, text): text_instances = page.search_for(text) for inst in text_instances: page.add_redact_annot(inst, fill=(0, 0, 0)) page.apply_redactions() def read_pdf(file): pdf_document = fitz.open(stream=file.read(), filetype="pdf") text = "" for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) text += page.get_text() return text, pdf_document def read_docx(file): doc = Document(file) text = "\n".join([para.text for para in doc.paragraphs]) return text def read_txt(file): text = file.read().decode("utf-8") return text def process_file(file): if file.type == "application/pdf": return read_pdf(file) elif file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return read_docx(file), None elif file.type == "text/plain": return read_txt(file), None else: return "Unsupported file type.", None st.title("Redaction") uploaded_file = st.file_uploader("Upload a file", type=["pdf", "docx", "txt"]) if uploaded_file is not None: file_contents, pdf_document = process_file(uploaded_file) if pdf_document: redacted_text = '' for pg in pdf_document: text = pg.get_text() sentences = sentence_tokenize(text) for sent in sentences: x = mask_generation(sent) sent_n_q_c=[] sent_n = list(set(sent.lower().replace('.',' ').split("\n"))) for i in sent_n: for j in i.split(" "): sent_n_q_c+=j.split(',') x_q = x.lower().replace('.',' ').split(' ') e=[] for i in x_q: e+=i.split(',') t5_words=set(sent_n_q_c).difference(set(e)) entities,words_out = extract_entities(sent) # print("\nwords_out:",words_out) # print("\nT5",t5_words) # print("X:",x,"\nsent:",sent,"\nx_q:",x_q,"\nsent_n:",sent_n,"\ne:",e,"\nsent_n_q_c:",sent_n_q_c,'\nt5_words',t5_words) bert_words = words_red_bert(sent) # print("\nbert:",bert_words) new=[] for w in words_out: new+=w.split('\n') words_out+=t5_words new+=bert_words words_out = [i for i in new if len(i)>3] # print("\nfinal:",words_out) words_out=sorted(words_out, key=len,reverse=True) for i in words_out: redact_text(pg,i) # st.text_area(redacted_text) output_pdf = "output_redacted.pdf" pdf_document.save(output_pdf) with open(output_pdf, "rb") as file: st.download_button( label="Download Processed PDF", data=file, file_name="processed_file.pdf", mime="application/pdf", ) else: token = sentence_tokenize(file_contents) final = '' for i in range(0, len(token)): final += mask_generation(token[i]) + '\n' processed_text = final st.text_area("OUTPUT", processed_text, height=400) st.download_button( label="Download Processed File", data=processed_text, file_name="processed_file.txt", mime="text/plain", )