J’ai finalement fait une version V2 qui corrige quelques problèmes (Exception de quelques emails our quelques noms de domaines). Il faut mettre son MonLogin .
#!/usr/bin/env python3
import email
import plistlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch
class Emlx(object):
def __init__(self):
super(Emlx, self).__init__()
self.bytecount = 0
self.msg_data = None
self.msg_plist = None
def parse(self, filename_path):
with open(filename_path, "rb") as f:
self.bytecount = int(f.readline().strip())
self.msg_data = email.message_from_bytes(f.read(self.bytecount))
self.msg_plist = plistlib.loads(f.read())
return self.msg_data, self.msg_plist
if __name__ == '__main__':
msg = Emlx()
nb_parse = 0
nb_error = 0
printable = set(string.printable)
path_mail = "/Users/MonLogin/Library/Mail/V6/"
es_keys = "mail"
es=Elasticsearch([{'host':'localhost','port':9200}])
for root, dirs, files in os.walk(path_mail):
for file in files:
if file.endswith(".emlx"):
file_full = os.path.join(root, file)
message, plist = msg.parse(file_full)
statinfo = os.stat(file_full)
my_date = message['Date']
my_id = message['Message-ID']
my_server = message['Received']
my_date_str = ""
if my_date is not None and my_date is not Header:
try:
my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
except :
my_date_str = ""
my_email = str(message['From'])
if my_email is not None:
my_domain = re.search("@[\w.\-\_]+", str(my_email))
if my_domain is not None:
my_domain_str = str(my_domain.group ());
my_domain_str = my_domain_str.lower()
if my_email is not None:
my_name = re.search("[\w.\-\_]+@", str(my_email))
if my_name is not None:
my_name_str = str(my_name.group ());
my_name_str = my_name_str.lower()
if my_domain is not None:
#print(my_domain.group())
#print(my_name.group())
json = '{"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
else:
my_email = my_email.replace(",","")
my_email = my_email.replace('"','')
my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email))
my_email = my_email.lower()
json = '{"name":"'+my_email+'","domain":"None"';
if my_date is not None and len(my_date_str) > 1:
json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
else:
json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
if my_server is not None and my_server is not Header:
ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
if ip is not None:
my_ip = ip.group()
json = json+',"ip":"'+str(my_ip)+'"'
else:
my_ip = ""
#ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
#ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
#ip = re.findall(r'[\d.-]+', my_server)
else:
json = json
if my_id is not None and my_id is not Header:
try:
my_id =my_id.strip()
my_id =my_id.strip('\n')
json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
except:
json = json+',"file":"'+file+'"}'
else:
json = json+',"file":"'+file+'"}'
print(json)
try:
res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
except:
nb_error += 1
nb_parse += 1
#print(plist)
print(nb_parse)
Et désolé si le programme n’est pas très propre et sans commentaire … le but est surtout de jouer avec Elasticsearch/Kibana !