1
+ # email_processing.py
2
+
3
+ import email
4
+ from email .utils import parsedate_to_datetime , getaddresses
5
+ import re
6
+ from typing import Dict , Any , List
7
+ from bs4 import BeautifulSoup
8
+ import spacy
9
+ from sklearn .feature_extraction .text import TfidfVectorizer
10
+ from sklearn .metrics .pairwise import cosine_similarity
11
+ import numpy as np
12
+ from gensim import corpora
13
+ from gensim .models import LdaModel
14
+ from gensim .parsing .preprocessing import STOPWORDS
15
+ from gensim .utils import simple_preprocess
16
+ from textblob import TextBlob
17
+
18
+ nlp = spacy .load ("en_core_web_sm" )
19
+
20
+ #contains all the functions for parsing emails, extracting metadata, calculating relevance scores, performing entity recognition, topic clustering, sentiment analysis, and keyword extraction.
21
+
22
+
23
+ def parse_email (email_content : str ) -> Dict [str , Any ]:
24
+ msg = email .message_from_string (email_content )
25
+
26
+ body = ""
27
+ html_body = ""
28
+ if msg .is_multipart ():
29
+ for part in msg .walk ():
30
+ if part .get_content_type () == "text/plain" :
31
+ body = part .get_payload (decode = True ).decode ()
32
+ elif part .get_content_type () == "text/html" :
33
+ html_body = part .get_payload (decode = True ).decode ()
34
+ else :
35
+ body = msg .get_payload (decode = True ).decode ()
36
+
37
+ if not body and html_body :
38
+ body = BeautifulSoup (html_body , "html.parser" ).get_text ()
39
+
40
+ return {
41
+ 'subject' : msg ['subject' ],
42
+ 'from' : msg ['from' ],
43
+ 'to' : msg ['to' ],
44
+ 'cc' : msg ['cc' ],
45
+ 'date' : parsedate_to_datetime (msg ['date' ]),
46
+ 'body' : body ,
47
+ 'message_id' : msg ['message-id' ],
48
+ 'in_reply_to' : msg ['in-reply-to' ],
49
+ 'references' : msg ['references' ]
50
+ }
51
+
52
+
53
+ def extract_metadata (parsed_email : Dict [str , Any ]) -> Dict [str , Any ]:
54
+ metadata = {
55
+ 'subject' : parsed_email ['subject' ],
56
+ 'from' : getaddresses ([parsed_email ['from' ]]),
57
+ 'to' : getaddresses ([parsed_email ['to' ]]),
58
+ 'cc' : getaddresses ([parsed_email ['cc' ]]) if parsed_email ['cc' ] else [],
59
+ 'date' : parsed_email ['date' ],
60
+ 'email_addresses' : re .findall (r'[\w\.-]+@[\w\.-]+' , parsed_email ['body' ]),
61
+ 'urls' : re .findall (r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' ,
62
+ parsed_email ['body' ]),
63
+ 'message_id' : parsed_email ['message_id' ],
64
+ 'in_reply_to' : parsed_email ['in_reply_to' ],
65
+ 'references' : parsed_email ['references' ]
66
+ }
67
+ return metadata
68
+
69
+
70
+ def calculate_relevance_score (email_body : str , query : str ) -> float :
71
+ vectorizer = TfidfVectorizer ()
72
+ tfidf_matrix = vectorizer .fit_transform ([email_body , query ])
73
+ cosine_sim = cosine_similarity (tfidf_matrix [0 :1 ], tfidf_matrix [1 :2 ])
74
+ return float (cosine_sim [0 ][0 ])
75
+
76
+
77
+ def extract_entities (text : str ) -> Dict [str , list ]:
78
+ doc = nlp (text )
79
+ entities = {
80
+ 'PERSON' : [],
81
+ 'ORG' : [],
82
+ 'GPE' : [], # Geopolitical Entity
83
+ 'DATE' : [],
84
+ 'MONEY' : []
85
+ }
86
+ for ent in doc .ents :
87
+ if ent .label_ in entities :
88
+ entities [ent .label_ ].append (ent .text )
89
+ return entities
90
+
91
+
92
+ def perform_topic_clustering (email_bodies : List [str ], num_topics : int = 5 ) -> List [Dict [str , Any ]]:
93
+ def preprocess (text ):
94
+ return [token for token in simple_preprocess (text ) if token not in STOPWORDS ]
95
+
96
+ processed_docs = [preprocess (doc ) for doc in email_bodies ]
97
+ dictionary = corpora .Dictionary (processed_docs )
98
+ corpus = [dictionary .doc2bow (doc ) for doc in processed_docs ]
99
+
100
+ lda_model = LdaModel (corpus = corpus , id2word = dictionary , num_topics = num_topics , random_state = 100 ,
101
+ update_every = 1 , chunksize = 100 , passes = 10 , alpha = 'auto' , per_word_topics = True )
102
+
103
+ topics = lda_model .print_topics (num_words = 10 )
104
+ return [{'id' : topic [0 ], 'words' : dict (word .split ('*' ) for word in topic [1 ].split (' + ' ))} for topic in topics ]
105
+
106
+
107
+ def analyze_email_thread (emails : List [Dict [str , Any ]]) -> List [Dict [str , Any ]]:
108
+ thread_map = {}
109
+ for email in emails :
110
+ message_id = email ['metadata' ]['message_id' ]
111
+ in_reply_to = email ['metadata' ]['in_reply_to' ]
112
+ if in_reply_to :
113
+ if in_reply_to in thread_map :
114
+ thread_map [in_reply_to ]['replies' ].append (email )
115
+ else :
116
+ thread_map [in_reply_to ] = {'email' : None , 'replies' : [email ]}
117
+ if message_id :
118
+ if message_id in thread_map :
119
+ thread_map [message_id ]['email' ] = email
120
+ else :
121
+ thread_map [message_id ] = {'email' : email , 'replies' : []}
122
+
123
+ threads = []
124
+ for message_id , thread_info in thread_map .items ():
125
+ if not thread_info ['email' ]['metadata' ]['in_reply_to' ]:
126
+ threads .append (thread_info )
127
+
128
+ return threads
129
+
130
+
131
+ def perform_sentiment_analysis (text : str ) -> Dict [str , float ]:
132
+ blob = TextBlob (text )
133
+ return {
134
+ 'polarity' : blob .sentiment .polarity ,
135
+ 'subjectivity' : blob .sentiment .subjectivity
136
+ }
137
+
138
+
139
+ def extract_keywords (text : str , num_keywords : int = 10 ) -> List [str ]:
140
+ doc = nlp (text )
141
+ keywords = []
142
+ for token in doc :
143
+ if not token .is_stop and not token .is_punct and token .pos_ in ['NOUN' , 'PROPN' , 'ADJ' ]:
144
+ keywords .append (token .text .lower ())
145
+ return list (set (keywords ))[:num_keywords ]
0 commit comments