From 92059857382fc5a1cc4bc742d3e16dcd1bec0222 Mon Sep 17 00:00:00 2001 From: Stan Ng Date: Mon, 17 Jun 2024 15:27:56 -0700 Subject: [PATCH] Change sensitive fields to sensitive data types. We will now return a dictionary of sensitive data types, with a list of the matching field names. --- CHANGELOG.md | 4 +- README.md | 12 +- formfyxer/lit_explorer.py | 166 ++++++++++++++++----------- formfyxer/tests/test_lit_explorer.py | 100 +++++++++++----- 4 files changed, 179 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff374f2..edc9631 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # CHANGELOG -## Version v0.2.0 +## Version v0.3.0 ### Added * Add warning when sensitive fields are detected by @codestronger in https://github.com/SuffolkLITLab/RateMyPDF/issues/25 @@ -12,7 +12,7 @@ N/A ### Fixed N/A -**Full Changelog**: https://github.com/SuffolkLITLab/FormFyxer/compare/v0.2.0...v0.3.0a2 +**Full Changelog**: https://github.com/SuffolkLITLab/FormFyxer/compare/v0.2.0...v0.3.0 ## Version v0.2.0 diff --git a/README.md b/README.md index 02b5d01..cd1b57d 100644 --- a/README.md +++ b/README.md @@ -448,8 +448,10 @@ An object grouping together similar field names. -### formfyxer.get_sensitive_fields(fields) -Given a list of fields, identify those related to sensitive information. Sensitive fields include Social Security Number(SSN)), Driver's License (DL), and account numbers. +### formfyxer.get_sensitive_data_types(fields, fields_old) +Given a list of fields, identify those related to sensitive information and return a dictionary with the sensitive fields grouped by type. A list of the old field names can also be provided. These fields should be in the same order. Passing the old field names allows the sensitive field algorithm to match more accurately. The return value will not contain the old field name, only the corresponding field name from the first parameter. + +The sensitive field types are: Bank Account Number, Credit Card Number, Driver's License Number, and Social Security Number. #### Parameters: * **fields : List[str]** List of field names. #### Returns: @@ -457,8 +459,10 @@ List of sensitive fields found within the fields passed in. #### Example: ```python >>> import formfyxer ->>> formfyxer.get_sensitive_fields(["users1_name", "users1_address", "users1_ssn", "users1_routing_number"]) -['Social Security Number', 'Bank Account Number'] +>>> formfyxer.get_sensitive_data_types(["users1_name", "users1_address", "users1_ssn", "users1_routing_number"]) +{'Social Security Number': ['users1_ssn'], 'Bank Account Number': ['users1_routing_number']} +>>> formfyxer.get_sensitive_data_types(["user_ban1", "user_credit_card_number", "user_cvc", "user_cdl", "user_social_security"], ["old_bank_account_number", "old_credit_card_number", "old_cvc", "old_drivers_license", "old_ssn"]) +{'Bank Account Number': ['user_ban1'], 'Credit Card Number': ['user_credit_card_number', 'user_cvc'], "Driver's License Number": ['user_cdl'], 'Social Security Number': ['user_social_security']} ``` [back to top](#formfyxer) diff --git a/formfyxer/lit_explorer.py b/formfyxer/lit_explorer.py index b2e6f31..0e62efb 100644 --- a/formfyxer/lit_explorer.py +++ b/formfyxer/lit_explorer.py @@ -34,7 +34,7 @@ FieldType, unlock_pdf_in_place, is_tagged, -) +) try: from nltk.corpus import stopwords @@ -131,18 +131,20 @@ with open( os.path.join(os.path.dirname(__file__), "keys", "openai_key.txt"), "r" ) as in_file: - default_key:Optional[str] = in_file.read().rstrip() + default_key: Optional[str] = in_file.read().rstrip() except: default_key = None try: with open( os.path.join(os.path.dirname(__file__), "keys", "openai_org.txt"), "r" ) as in_file: - default_org:Optional[str] = in_file.read().rstrip() + default_org: Optional[str] = in_file.read().rstrip() except: default_org = None if default_key: - client:Optional[OpenAI] = OpenAI(api_key=default_key, organization=default_org or None) + client: Optional[OpenAI] = OpenAI( + api_key=default_key, organization=default_org or None + ) elif os.getenv("OPENAI_API_KEY"): client = OpenAI() else: @@ -160,6 +162,7 @@ CURRENT_DIRECTORY, "data", "simplified_words.yml" ) + # This creates a timeout exception that can be triggered when something hangs too long. class TimeoutException(Exception): pass @@ -429,18 +432,19 @@ def normalize_name( not, to a snake_case variable name of appropriate length. HACK: temporarily all we do is re-case it and normalize it using regex rules. - Will be replaced with call to LLM soon. + Will be replaced with call to LLM soon. """ - + if this_field not in included_fields: this_field = re_case(this_field) this_field = regex_norm_field(this_field) if this_field in included_fields: return f"*{this_field}", 0.01 - + return reformat_field(this_field, tools_token=tools_token), 0.5 + # Take a list of AL variables and spits out suggested groupings. Here's what's going on: # # 1. It reads in a list of fields (e.g., `["user_name","user_address"]`) @@ -652,23 +656,21 @@ def classify_field(field: FieldInfo, new_name: str) -> AnswerType: return AnswerType.GATHERED -def get_adjusted_character_count( - field: FieldInfo -)-> float: +def get_adjusted_character_count(field: FieldInfo) -> float: """ - Determines the bracketed length of an input field based on its max_length attribute, - returning a float representing the approximate length of the field content. + Determines the bracketed length of an input field based on its max_length attribute, + returning a float representing the approximate length of the field content. The function chunks the answers into 5 different lengths (checkboxes, 2 words, short, medium, and long) instead of directly using the character count, as forms can allocate different spaces for the same data without considering the space the user actually needs. Args: - field (FieldInfo): An object containing information about the input field, + field (FieldInfo): An object containing information about the input field, including the "max_length" attribute. Returns: - float: The approximate length of the field content, categorized into checkboxes, 2 words, short, + float: The approximate length of the field content, categorized into checkboxes, 2 words, short, medium, or long based on the max_length attribute. Examples: @@ -694,10 +696,8 @@ def get_adjusted_character_count( ) # Anything over 10 lines probably needs a full page but form author skimped on space if field["type"] != InputType.TEXT: return ONE_WORD - - if field["max_length"] <= ONE_LINE or ( - field["max_length"] <= ONE_LINE * 2 - ): + + if field["max_length"] <= ONE_LINE or (field["max_length"] <= ONE_LINE * 2): return ONE_WORD * 2 elif field["max_length"] <= SHORT_ANSWER: return SHORT_ANSWER @@ -816,7 +816,12 @@ class OpenAiCreds(TypedDict): key: str -def text_complete(prompt:str, max_tokens:int=500, creds: Optional[OpenAiCreds] = None, temperature:float=0) -> str: +def text_complete( + prompt: str, + max_tokens: int = 500, + creds: Optional[OpenAiCreds] = None, + temperature: float = 0, +) -> str: """Run a prompt via openAI's API and return the result. Args: @@ -836,16 +841,13 @@ def text_complete(prompt:str, max_tokens:int=500, creds: Optional[OpenAiCreds] = response = openai_client.chat.completions.create( model="gpt-3.5-turbo", messages=[ - { - "role": "system", - "content": prompt - }, + {"role": "system", "content": prompt}, ], temperature=temperature, max_tokens=max_tokens, top_p=1.0, frequency_penalty=0.0, - presence_penalty=0.0 + presence_penalty=0.0, ) return str((response.choices[0].message.content or "").strip()) except Exception as ex: @@ -972,38 +974,52 @@ def get_citations(text: str, tokenized_sentences: List[str]) -> List[str]: return citations_with_context -def get_sensitive_fields(fields: List[str]) -> List[str]: +# NOTE: omitting "CID" for Credit Card IDs since it has a lot of false positives. +FIELD_PATTERNS = { + "Bank Account Number": [ + r"account[\W_]*number", + r"ABA$", + r"routing[\W_]*number", + r"checking", + ], + "Credit Card Number": [r"credit[\W_]*card", r"(CV[CDV]2?|CCV|CSC)"], + "Driver's License Number": [r"drivers[\W_]*license", r".?DL$"], + "Social Security Number": [r"social[\W_]*security[\W_]*number", r"SSN", r"TIN$"], +} +FIELD_REGEXES = { + name: re.compile("|".join(patterns), re.IGNORECASE | re.MULTILINE) + for name, patterns in FIELD_PATTERNS.items() +} + + +def get_sensitive_data_types( + fields: List[str], fields_old: Optional[List[str]] = None +) -> Dict[str, List[str]]: """ - Given a list of fields, identify those related to sensitive information. Sensitive fields include - Social Security Number(SSN)), Driver's License (DL), and account numbers. + Given a list of fields, identify those related to sensitive information and return a dictionary with the sensitive + fields grouped by type. A list of the old field names can also be provided. These fields should be in the same + order. Passing the old field names allows the sensitive field algorithm to match more accurately. The return value + will not contain the old field name, only the corresponding field name from the first parameter. + + The sensitive data types are: Bank Account Number, Credit Card Number, Driver's License Number, and Social Security + Number. """ - # NOTE: omitting CID since it has a lot of false positives. - field_patterns = { - "Social Security Number": [ - "social[\W_]*security[\W_]*number", - "SSN", - "TIN$" - ], - "Bank Account Number": [ - "account[\W_]*number", - "ABA$", - "routing[\W_]*number", - "checking" - ], - "Credit Card Number": [ - "credit[\W_]*card", - "(CV[CDV]2?|CCV|CSC)" - ], - "Driver's License Number": [ - "drivers[\W_]*license", - ".?DL$" - ] - } - text = "\n".join(fields) - field_regexes = {name: re.compile("|".join(patterns), re.IGNORECASE | re.MULTILINE) for name, patterns in field_patterns.items()} - sensitive_fields = [name for name, regex in field_regexes.items() if re.search(regex, text)] - return sensitive_fields + if fields_old is not None and len(fields) != len(fields_old): + raise ValueError( + "If provided, fields_old must have the same number of items as fields." + ) + + sensitive_data_types: Dict[str, List[str]] = {} + num_fields = len(fields) + for i, field in enumerate(fields): + for name, regex in FIELD_REGEXES.items(): + if re.search(regex, field): + sensitive_data_types.setdefault(name, []).append(field) + elif fields_old is not None and re.search(regex, fields_old[i]): + sensitive_data_types.setdefault(name, []).append(field) + return sensitive_data_types + def substitute_phrases( input_string: str, substitution_phrases: Dict[str, str] @@ -1037,7 +1053,9 @@ def substitute_phrases( # Find all matches for the substitution phrases for original, replacement in sorted_phrases: - for match in re.finditer(r"\b" + re.escape(original) + r"\b", input_string, re.IGNORECASE): + for match in re.finditer( + r"\b" + re.escape(original) + r"\b", input_string, re.IGNORECASE + ): matches.append((match.start(), match.end(), replacement)) # Sort the matches based on their starting position @@ -1141,7 +1159,11 @@ def parse_form( except: readability = -1 # Still attempt to re-evaluate if not using openai - if not original_text or (openai_creds and description == "abortthisnow.") or readability > 30: + if ( + not original_text + or (openai_creds and description == "abortthisnow.") + or readability > 30 + ): # We do not care what the PDF output is, doesn't add that much time ocr_p = [ "ocrmypdf", @@ -1231,10 +1253,7 @@ def parse_form( classify_field(field, new_names[index]) for index, field in enumerate(field_types) ] - # NOTE: we send both the original and the cleaned up field names. There are cases where one or the other is cleaner. - # Since the sensitive fields are tagged as a group name rather than individual field names, it does no harm to send - # more variations to help detection. - sensitive_fields = get_sensitive_fields(field_names + new_names) + sensitive_data_types = get_sensitive_data_types(new_names, field_names) slotin_count = sum(1 for c in classified if c == AnswerType.SLOT_IN) gathered_count = sum(1 for c in classified if c == AnswerType.GATHERED) @@ -1253,15 +1272,17 @@ def parse_form( "category": cat, "pages": pages_count, "reading grade level": readability, - "time to answer": time_to_answer_form(field_types_and_sizes(ff), new_names) - if ff - else [-1, -1], + "time to answer": ( + time_to_answer_form(field_types_and_sizes(ff), new_names) + if ff + else [-1, -1] + ), "list": nsmi, "avg fields per page": f_per_page, "fields": new_names, "fields_conf": new_names_conf, "fields_old": field_names, - "sensitive fields": sensitive_fields, + "sensitive data types": sensitive_data_types, "text": text, "original_text": original_text, "number of sentences": sentence_count, @@ -1274,16 +1295,21 @@ def parse_form( "slotin percent": slotin_count / field_count if field_count > 0 else 0, "gathered percent": gathered_count / field_count if field_count > 0 else 0, "created percent": created_count / field_count if field_count > 0 else 0, - "third party percent": third_party_count / field_count - if field_count > 0 - else 0, + "third party percent": ( + third_party_count / field_count if field_count > 0 else 0 + ), "passive voice percent": ( passive_sentences_count / sentence_count if sentence_count > 0 else 0 ), "citations per field": citation_count / field_count if field_count > 0 else 0, "citation count": citation_count, "all caps percent": all_caps_count / word_count, - "normalized characters per field": sum(get_adjusted_character_count(field) for field in field_types ) / field_count if ff else 0, + "normalized characters per field": ( + sum(get_adjusted_character_count(field) for field in field_types) + / field_count + if ff + else 0 + ), "difficult words": difficult_words, "difficult word count": difficult_word_count, "difficult word percent": difficult_word_count / word_count, @@ -1342,7 +1368,7 @@ def _form_complexity_per_metric(stats): {"name": "pages", "weight": 2}, {"name": "citations per field", "weight": 1.2}, {"name": "avg fields per page", "weight": 1 / 8}, - {"name": "normalized characters per field", "weight": 1/8}, + {"name": "normalized characters per field", "weight": 1 / 8}, {"name": "sentences per page", "weight": 0.05}, # percents will have a higher weight, because they are between 0 and 1 {"name": "slotin percent", "weight": 2}, @@ -1360,11 +1386,11 @@ def weight(stats, metric): weight = metric.get("weight") or 1 val = 0 if "clip" in metric: - val = min(max(stats.get(name,0), metric["clip"][0]), metric["clip"][1]) + val = min(max(stats.get(name, 0), metric["clip"][0]), metric["clip"][1]) elif isinstance(stats.get(name), bool): val = 1 if stats.get(name) else 0 else: - val = stats.get(name,0) + val = stats.get(name, 0) if "intercept" in metric: val -= metric["intercept"] return val * weight diff --git a/formfyxer/tests/test_lit_explorer.py b/formfyxer/tests/test_lit_explorer.py index 5c57076..41c1ce4 100644 --- a/formfyxer/tests/test_lit_explorer.py +++ b/formfyxer/tests/test_lit_explorer.py @@ -7,7 +7,7 @@ sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) -from formfyxer.lit_explorer import spot, substitute_phrases +from formfyxer.lit_explorer import spot, substitute_phrases, get_sensitive_data_types class TestSubstitutePhrases(unittest.TestCase): @@ -115,45 +115,91 @@ def test_phrase_and_position_various_orders(self): class TestSpot(unittest.TestCase): def setUp(self) -> None: self.request_args = { - 'url': 'https://spot.suffolklitlab.org/v0/entities-nested/', - 'headers': { - 'Authorization': 'Bearer your_SPOT_API_token goes here', - 'Content-Type': 'application/json' + "url": "https://spot.suffolklitlab.org/v0/entities-nested/", + "headers": { + "Authorization": "Bearer your_SPOT_API_token goes here", + "Content-Type": "application/json", + }, + "data": { + "text": "", + "save-text": 0, + "cutoff-lower": 0.25, + "cutoff-pred": 0.5, + "cutoff-upper": 0.6, }, - 'data': { - 'text': '', - 'save-text': 0, - 'cutoff-lower': 0.25, - 'cutoff-pred': 0.5, - 'cutoff-upper': 0.6, - } } return super().setUp() - - @mock.patch('requests.post') + @mock.patch("requests.post") def test_calls_spot(self, mock_post): - text = 'The quick brown fox jumps over the lazy dog.' - self.request_args['data']['text'] = text + text = "The quick brown fox jumps over the lazy dog." + self.request_args["data"]["text"] = text spot(text) mock_post.assert_called_with( - self.request_args['url'], - headers=self.request_args['headers'], - data=json.dumps(self.request_args['data']) + self.request_args["url"], + headers=self.request_args["headers"], + data=json.dumps(self.request_args["data"]), ) - - @mock.patch('requests.post') + @mock.patch("requests.post") def test_calls_spot_with_reduced_character_count(self, mock_post): - text = 'a' * 5001 - reduced_text = 'a' * 5000 - self.request_args['data']['text'] = reduced_text + text = "a" * 5001 + reduced_text = "a" * 5000 + self.request_args["data"]["text"] = reduced_text spot(text) mock_post.assert_called_with( - self.request_args['url'], - headers=self.request_args['headers'], - data=json.dumps(self.request_args['data']) + self.request_args["url"], + headers=self.request_args["headers"], + data=json.dumps(self.request_args["data"]), + ) + + +class TestGetSensitiveDataTypes(unittest.TestCase): + def test_without_fields_old(self): + actual_output = get_sensitive_data_types( + ["users1_name", "users1_address", "users1_ssn", "users1_routing_number"], + None, + ) + self.assertEqual( + actual_output, + { + "Social Security Number": ["users1_ssn"], + "Bank Account Number": ["users1_routing_number"], + }, + ) + + def test_merge_of_fields_from_both(self): + actual_output = get_sensitive_data_types( + [ + "user_ban1", + "user_credit_card_number", + "user_cvc", + "user_cdl", + "user_social_security", + ], + [ + "old_bank_account_number", + "old_credit_card_number", + "old_cvc", + "old_drivers_license", + "old_ssn", + ], + ) + self.assertEqual( + actual_output, + { + "Bank Account Number": ["user_ban1"], + "Credit Card Number": ["user_credit_card_number", "user_cvc"], + "Driver's License Number": ["user_cdl"], + "Social Security Number": ["user_social_security"], + }, + ) + + def test_no_sensitive_data_types(self): + actual_output = get_sensitive_data_types( + ["name", "address", "zip", "signature"] ) + self.assertEqual(actual_output, {}) if __name__ == "__main__":