Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
SpamFilterAdversarial303COM/main.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
224 lines (174 sloc)
7.4 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import nltk | |
from nltk.stem import WordNetLemmatizer | |
from nltk.corpus import wordnet | |
import string | |
from random import randrange | |
import re | |
from collections import Counter | |
import math | |
import language_tool_python | |
import os, subprocess | |
lemmatizer = WordNetLemmatizer() | |
stop_words = set(nltk.corpus.stopwords.words('english')) | |
WORD = re.compile(r"\w+") | |
tool = language_tool_python.LanguageTool('en-US') | |
dataset_path = # input dataset path, ex: "D:\\Projects\\Uni\\TestData\\spam\\" | |
#dataset_path = # test purpose | |
testing_path = # output folder path "C:\\Users\\Ken\\Desktop\\modify\\" | |
path_weka = # weka result buffer path | |
#path_weka2 = # test purpose | |
model_path = # weka model path | |
weka_exe_path = # weka path | |
################ Prior Knowledge | |
def extractWeka(path_weka): | |
word_data = [] | |
f = open(path_weka) | |
data = f.read().split() | |
f.close() | |
counter = 0 | |
toggle = False | |
for index, word in enumerate(data): | |
if word == "spam" and toggle == False: | |
if counter == 0: | |
counter=1 | |
elif counter == 1: | |
toggle = True | |
continue | |
if word=="Time" and data[index+1] == "taken": | |
break | |
if (toggle == True and (index+1)%3==0) and (word not in stop_words and word.isnumeric() == False and len(word)>2): | |
value_ham, value_spam = data[index-2], data[index-1] | |
if(value_ham=="<laplace=1>"): value_ham = "1" | |
if(value_spam=="<laplace=1>"): value_spam = "1" | |
word_data.append((word, value_ham, value_spam)) | |
return word_data | |
########################################################## Token Obfuscation | |
def orderTokens(weka_array, attack_email_words, percentage=0.5): | |
all_scores = [] | |
target_words = [] | |
for index in range(1, len(attack_email_words)): # Skip Subject | |
for score in weka_array: | |
if score[0] == attack_email_words[index] and float(score[2])>=float(score[1]): | |
all_scores.append((index, float(score[2]))) | |
all_scores.sort(reverse=True, key=lambda x: x[1]) | |
for n_top in all_scores: | |
target_words.append(n_top[0]) | |
return target_words[:int(len(attack_email_words)*percentage)] | |
def tokenObfuscation(attack_email_words): | |
weka_array = extractWeka(path_weka) | |
spammy_words = orderTokens( weka_array, attack_email_words) | |
for index in spammy_words: | |
attack_email_words[index] = obfuscateToken(attack_email_words[index]) | |
return " ".join(attack_email_words) | |
def obfuscateToken(email_word, approach=2): | |
replacements = ( ('a','4'), ('e','3'), ('l','1'),('o','0') ) | |
if(approach==1): # Word Spacing | |
return " ".join(email_word) | |
elif(approach==2): # Advanced Obfuscation | |
for old, new in replacements: | |
email_word = email_word.replace(old, new) | |
h_word = int(len(email_word)/2) | |
return string.ascii_letters[randrange(len(string.ascii_letters))] + email_word[:h_word].upper() + " " + email_word[h_word:].upper() + string.ascii_letters[randrange(len(string.ascii_letters))] | |
#################################### Synonym Replacement | |
def synonymGrammar(attack_email_words, email, similarity=0.5, grammar_check=True, synonym=False): | |
if (synonym==True): | |
for index, word in enumerate(attack_email_words): | |
if get_cosine(email, " ".join(attack_email_words)) < similarity: break | |
if word not in stop_words: # Exclude stopwords | |
for syn in wordnet.synsets(word): | |
for l in syn.lemmas(): | |
if l.name() != word: # Ensure diff word | |
attack_email_words[index] = l.name() # Replace | |
attack_email = " ".join(attack_email_words) | |
if (grammar_check == True): attack_email = tool.correct(attack_email) | |
return attack_email | |
#https://stackoverflow.com/questions/15173225/calculate-cosine-similarity-given-2-sentence-strings | |
def get_cosine(text1, text2): | |
vec1 = text_to_vector(text1) | |
vec2 = text_to_vector(text2) | |
intersection = set(vec1.keys()) & set(vec2.keys()) | |
numerator = sum([vec1[x] * vec2[x] for x in intersection]) | |
sum1 = sum([vec1[x] ** 2 for x in list(vec1.keys())]) | |
sum2 = sum([vec2[x] ** 2 for x in list(vec2.keys())]) | |
denominator = math.sqrt(sum1) * math.sqrt(sum2) | |
if not denominator: | |
return 0.0 | |
else: | |
return float(numerator) / denominator | |
def text_to_vector(text): | |
words = WORD.findall(text) | |
for i in range(len(words)): | |
words[i] = lemmatizer.lemmatize(words[i]) | |
return Counter(words) | |
#################################### Token Injection | |
def getBestTokens(nature_type, percentage=0.01): | |
if nature_type == "ham": nature_type = 1 | |
elif nature_type == "spam": nature_type = 2 | |
weka_array = extractWeka(path_weka) | |
weka_array.sort(reverse=True, key=lambda x: float(x[nature_type])) | |
weka_array = weka_array[1:int(len(weka_array)*percentage)] | |
return weka_array | |
def injectToken(attack_email_words, inject_type, poison=None, percentage=0.5): | |
if poison == None: best_weka_injecting = getBestTokens(inject_type) | |
for x in range(int(len(attack_email_words)*percentage)): | |
if poison == None: attack_email_words.append(best_weka_injecting[randrange(len(best_weka_injecting))][0]) | |
else: attack_email_words.insert(randrange(len(attack_email_words)),poison[randrange(len(poison))][0]) | |
return " ".join(attack_email_words) | |
###################################### TEST&SAVE | |
def saveEmail(file_name, email): | |
f = open(testing_path+"\\spam\\"+file_name+"_p", "w", encoding="utf8", errors='replace') | |
f.write(email) | |
f.close() | |
def testEmails(): | |
#convert to .arff | |
command = "java.exe -cp \""+ weka_exe_path + "weka.jar\" weka.core.converters.TextDirectoryLoader -dir " + str(testing_path) + " > " + str(testing_path) + "test.arff" | |
os.system(command) | |
#run model against test | |
#command = "java.exe -cp \""+ weka_exe_path + "weka.jar\" weka.classifiers.bayes.NaiveBayesMultinomialText -l " + str(model_path) + "\\final.model -T " + str(testing_path) + "test.arff -classifications weka.classifiers.evaluation.output.prediction.PlainText" | |
#test = os.system(command) | |
####################################### PAYLOAD | |
def createPayload(main, option=None, poison=None): | |
for file in os.listdir(dataset_path): | |
f = open(dataset_path+file, "r" , encoding="utf8", errors='replace') | |
email = f.read() | |
f.close() | |
attack_email = email | |
attack_email_words = attack_email.split() | |
print(email) | |
if (main == 0): | |
c = 0 | |
small = extractWeka(path_weka) | |
big = extractWeka(path_weka2) | |
small.sort(reverse=True, key=lambda x: float(x[1])) | |
small = small[1:11] | |
big.sort(reverse=True, key=lambda x: float(x[1])) | |
for s in small: | |
for b in big: | |
if (s[0]==b[0]): | |
print(s, b) | |
return | |
if (main == 1): | |
if (option >= 0): | |
# Cosine, Grammar, Synonym | |
attack_email = synonymGrammar(attack_email_words, email) | |
print(attack_email+"\n") | |
if (option >= 1): | |
attack_email = tokenObfuscation(attack_email_words) | |
print(attack_email+"\n") | |
if (option >= 2): | |
attack_email = injectToken(attack_email_words, "ham") | |
if (main == 2): | |
poison = [] | |
if (poison=="spam"): index = 2 | |
else: index = 1 | |
tokens = extractWeka(path_weka) | |
tokens.sort(reverse=True, key=lambda x: float(x[index])) | |
tokens = tokens[1:11] | |
attack_email = injectToken(attack_email_words, poison, tokens) # attack_email_words, inject_type, poison=None, percentage=0.1 | |
saveEmail(file, attack_email) | |
print(file + "\n" + attack_email) | |
testEmails() | |
############################################ MAIN | |
def main(): | |
createPayload(1,2 ) # optimal attack: 1,2 | evasion: 2, "ham" | |
main() |