MacOS : Python : Découverte de l’API Python Elasticsearch/Kibana (Version V2)

J’ai finalement fait une version V2 qui corrige quelques problèmes (Exception de quelques emails our quelques noms de domaines). Il faut mettre son MonLogin .

#!/usr/bin/env python3

import email
import plistlib
import re
import glob, os
import string
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   nb_error = 0
   printable = set(string.printable)
   path_mail = "/Users/MonLogin/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             my_date_str = ""
             if my_date is not None and my_date is not Header:
                 try:
                   my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
                 except :
                   my_date_str = ""
             my_email = str(message['From'])
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
                 if my_domain is not None:
                      my_domain_str = str(my_domain.group ());
                      my_domain_str = my_domain_str.lower()
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
                 if my_name is not None:
                      my_name_str = str(my_name.group ());
                      my_name_str = my_name_str.lower()
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = '{"name":"'+my_name_str+'","domain":"'+my_domain_str+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 my_email = str(re.sub(r'[^\x00-\x7f]',r'', my_email)) 
                 my_email = my_email.lower()
                 json = '{"name":"'+my_email+'","domain":"None"';
             if my_date is not None and len(my_date_str) > 1:
                 json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             else:
                 json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 try:
                    my_id =my_id.strip()
                    my_id =my_id.strip('\n')
                    json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
                 except:
                    json = json+',"file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             try:
                res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             except:
                nb_error += 1   
             nb_parse += 1
             #print(plist)
   print(nb_parse)

 

Et désolé si le programme n’est pas très propre et sans commentaire … le but est surtout de jouer avec Elasticsearch/Kibana !

MacOS : Python : Découverte de l’API Python Elasticsearch

J’ai voulu faire un premier programme afin de découvrir l’API Elasticsearch, comme base d’information j’ai pris mes emails. C’est assez simple, toutes les personnes sous MacOS ont des emails …

Voici donc le petit programme en Python (pour Michel) : il suffit de changer MonUser.

#!/usr/bin/env python3

import email
import plistlib
import re
import glob, os
from datetime import datetime
from email.utils import parsedate_to_datetime
from email.header import Header, decode_header, make_header
from elasticsearch import Elasticsearch 

class Emlx(object):
        def __init__(self):
            super(Emlx, self).__init__()
            self.bytecount = 0
            self.msg_data = None
            self.msg_plist = None

        def parse(self, filename_path):
            with open(filename_path, "rb") as f:
                self.bytecount = int(f.readline().strip())
                self.msg_data = email.message_from_bytes(f.read(self.bytecount))
                self.msg_plist = plistlib.loads(f.read())
            return self.msg_data, self.msg_plist

if __name__ == '__main__':
   msg = Emlx()
   nb_parse = 0
   path_mail = "/Users/MonUser/Library/Mail/V6/"
   es_keys = "mail"
   es=Elasticsearch([{'host':'localhost','port':9200}])
   for root, dirs, files in os.walk(path_mail):
      for file in files:
          if file.endswith(".emlx"):
             file_full = os.path.join(root, file)
             message, plist = msg.parse(file_full)
             statinfo = os.stat(file_full)
             my_date = message['Date']
             my_id = message['Message-ID']
             my_server = message['Received']
             if my_date is not None and my_date is not Header:
                 my_date_str = datetime.fromtimestamp(parsedate_to_datetime(my_date).timestamp()).strftime('%Y-%m-%dT%H:%M:%S')
             my_email = str(message['From'])
             if my_email is not None:
                 my_domain = re.search("@[\w.\-\_]+", str(my_email))
             if my_email is not None:
                 my_name = re.search("[\w.\-\_]+@", str(my_email))
             if my_domain is not None:
                 #print(my_domain.group())
                 #print(my_name.group())
                 json = '{"name":"'+my_name.group()+'","domain":"'+my_domain.group()+'"'
             else:
                 my_email = my_email.replace(",","")
                 my_email = my_email.replace('"','')
                 json = '{"name":"'+my_email+'","domain":"None"';
             if my_date is not None:
                 json = json+',"date":"'+my_date_str+'","size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             else:
                 json = json+',"size":'+str(statinfo.st_size)+',"id":'+str(nb_parse)
             if my_server is not None and my_server is not Header:
                 ip = re.search(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', str(my_server))
                 if ip is not None:
                    my_ip = ip.group()
                    json = json+',"ip":"'+str(my_ip)+'"'
                 else:
                    my_ip = ""
                 #ip = re.findall(r'\b25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\.25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?\b',my_server)
                 #ip = re.findall( r'[0-9]+(?:\.[0-9]+){1,3}', my_server )
                 #ip = re.findall(r'[\d.-]+', my_server) 
             else:
                 json = json
             if my_id is not None and my_id is not Header:
                 my_id =my_id.strip()
                 my_id =my_id.strip('\n')
                 json = json+',"Message-ID":"'+my_id+'","file":"'+file+'"}'
             else:
                 json = json+',"file":"'+file+'"}'
             print(json)
             res = es.index(index=es_keys,doc_type='emlx',id=nb_parse,body=json)
             nb_parse += 1
             #print(plist)
   print(nb_parse)

Le but de ce programme c’est simplement de mieux comprendre l’API.
Pour le lancer j’ai fait :

sudo python3 ParseEmail.py > email-json.txt

A noter que le Terminal doit avoir certains droits pour que cela fonctionne : https://www.cyber-neurones.org/2019/11/macos-acces-a-library-mail-via-un-terminal/ .

Ensuite pour faire un petit contrôle il suffit de faire : http://localhost:9200/mail/_mappings .

{"mail":{"mappings":{"emlx":{"properties":{"Message-ID":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"date":{"type":"date"},"domain":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"file":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"id":{"type":"long"},"ip":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"name":{"type":"text","fields":{"keyword":{"type":"keyword","ignore_above":256}}},"size":{"type":"long"}}}}}}

Je viens de lancer le programme … c’est très long, voici ce qu’il a pour l’instant en base (sur les 20 dernières années) :

En mode Histogramme :

OpenDataGouv : Transaction immobilière sur Biot

En quelques étapes …

Etape 1: Voir un tweet passer de @ollybret :

Etape 2 : Vérifier l’information sur @datagouvfr : le lien étant : https://www.data.gouv.fr/fr/datasets/demandes-de-valeurs-foncieres/ 

Et se faire un profil :

Etape 3 : Voir du JSON : Micro-API DVF (Demande de Valeurs Foncières) et une API :

Voir même le résultat en fonction de la commune ou du code postal :

  • http://api.cquest.org/dvf?code_commune=94068 ( code_commune = code INSEE de la commune (ex: 94068) )
  • http://api.cquest.org/dvf?code_postal=89110 ( code_postal = code postal )

Superbe travail de Christian Quest ( sur Twitter @cq94 ‏) avec en plus la mise du code sur GitHub : https://github.com/cquest/dvf_as_api .( @github )

Etape 4 : Sortir le python ( @ThePSF ) et se faire une petite carte sur Biot :

$ python3 API_DVF_ToStaticMapColor.py 
Number total of request 1
Number total of pin 2598 Before date 624 After date 1974

Et publier les sources ( https://github.com/CYBERNEURONES/Python )  : les conditions d’utilisations : http://data.cquest.org/dgfip_dvf/conditions-generales-dutilisation.pdf.

#
# for Python 3
# 
#   ARIAS Frederic
#   Sorry ... It's difficult for me the python :)
#

from time import gmtime, strftime
import time
import json
import requests
from datetime import *
from staticmap import StaticMap, CircleMarker

m = StaticMap(1100, 1100, url_template='http://a.tile.osm.org/{z}/{x}/{y}.png')

codepostal = "06410"
date_inondation = datetime.strptime("2015-10-03",'%Y-%m-%d')
nb_request = 0
nb_plot = 0
after_inondation = 0
before_inondation = 0
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}
url_notes = (
    "http://api.cquest.org/dvf?"
    "code_postal="+codepostal
)
try:
    resp = requests.get(url_notes, headers=headers)
    nb_request += 1
    resp.raise_for_status()
    resp_dict = resp.json()
    #print(resp_dict);
    for my_resource in resp_dict['resultats']:
        #print(my_resource)
        long = my_resource.get('lon')
        lat = my_resource.get('lat')
        my_date = datetime.strptime(my_resource.get('date_mutation'),'%Y-%m-%d')
        if (long) and (lat):
           nb_plot += 1
           #print("Long:",float(long),"Lat:",float(lat),"Date:",my_date);
           if my_date > date_inondation : 
             marker = CircleMarker((float(long), float(lat)), '#0036FF', 8)
             after_inondation += 1
           else :         
             marker = CircleMarker((float(long), float(lat)), '#FF3600', 8)
             before_inondation += 1
           m.add_marker(marker)
except requests.exceptions.HTTPError as e:
    print("Bad HTTP status code:", e)
except requests.exceptions.RequestException as e:
    print("Network error:", e)

image = m.render(zoom=13)
image.save('mymap_zoom13_col.png')
image = m.render(zoom=14)
image.save('mymap_zoom14_col.png')
image = m.render(zoom=15)
image.save('mymap_zoom15_col.png')

print ("Number total of request",nb_request)
print ("Number total of pin",nb_plot,"Before date",before_inondation,"After date",after_inondation)

Etape n°5 : Voir le résultat :

A la base je voulais localiser les achats du Fond Barnier, mais je n’ai rien trouvé … sniff.

J’ai aussi fait un programme ( https://github.com/CYBERNEURONES/Python/blob/master/API_DVF_ToStaticMapColor_V2.py )  pour travailler avec le code insee …. mais pas mieux :

Toutes les données

Expropriation, Adjudication.

A suivre …

Python : How to migrate data of Awesome Note 2 (bridworks.com) to Joplin ?

Awesome Note 2, it’s very popular on iPad :

The new All-in-one Organizer, Awesome Note 2 is integrated with note and schedule management.
And now it’s available!!

WONDERFUL WRITING FEATURES
· It can be used not only for simple notes, but also rich and wonderful writing tool.
· Make notes even more powerful to add photos, voice recording and drawings.
· Easily create diary notes to display feeling, weather or road map information.

SIMPLE, FLEXIBLE, AND FRIENDLY
·  Broadly use as diary to record everyday life, travel notes to write anywhere, photo albums, shopping lists, and record for work or study in any theme.
 
NOTE AND SCHEDULING AS ONE
· Manage your iOS Calendar and Reminders in one.
· Check todo lists and manage all schedules with calendar
· Receive notifications for important events and easily manage anniversaries such as birthdays.
 
NEAT AND STYLISH DESIGN
· Create your own style with tastefully designed icons, folders and various note backgrounds.
 
Capture all the memorable moments, stories, and everything in you. 

Step 0 : Install Joplin and activate the REST API ( https://joplin.cozic.net/api/ ) .

Step 1 : Install Python.

Step 2 : Create a backup of Awesome Note. ( for exemple : aNote_13Folders_20170520_00_24_21_579Notes.anb )

Step 3 : Uncompress the backup (It’s zip).

Step 4 : Put the script and change Token and name of folder of Backup.

The script :

#
# Version 1 
# for Python 3
# 
#   ARIAS Frederic
#   Sorry ... It's difficult for me the python :)
#

import plistlib
import os
import glob

folder = "Put the name of folder"

import requests
import re
import json
from subprocess import Popen, PIPE
import xml.etree.ElementTree
import xml.etree.ElementTree as ET
from xml.dom import minidom
from bpylist import bplist
import base64

###

#IP
ip = "127.0.0.1"
#Port
port = "41184"
#Token
token = "Put the token"
nb_import = 0;
headers = {'Content-type': 'application/json', 'Accept': 'text/plain'}

url_notes = (
    "http://"+ip+":"+port+"/notes?"
    "token="+token
)
url_folders = (
    "http://"+ip+":"+port+"/folders?"
    "token="+token
)
url_tags = (
    "http://"+ip+":"+port+"/tags?"
    "token="+token
)
url_ressources = (
    "http://"+ip+":"+port+"/ressources?"
    "token="+token
)

#Init
Awesome_UID = "12345678901234567801234567890123"
Awesome_UID_real = ""

payload = {
    "id":Awesome_UID,
    "title":"Awesome Note Import"
}

try:
    resp = requests.post(url_folders, data=json.dumps(payload, separators=(',',':')), headers=headers)
    #time.sleep(1)
    resp.raise_for_status()
    resp_dict = resp.json()
    print(resp_dict)
    print("My ID")
    print(resp_dict['id'])
    Awesome_UID_real = resp_dict['id']
    save = str(resp_dict['id'])
except requests.exceptions.HTTPError as e:
    print("Bad HTTP status code:", e)
except requests.exceptions.RequestException as e:
    print("Network error:", e)

###
nb_picture = 0
order = 0

# Convert to XML
files = os.listdir(folder)
for name in files:
  if name.endswith('.anote'):
    #print(name)
    name2 = os.path.splitext(name)[0]
    my_file = folder+"/"+name2+".xml"
    name = name.replace(" ", "\\ ")
    name = name.replace("(", "\\(")
    name = name.replace(")", "\\)")
    name = name.replace("&", "\\&")
    name2 = name2.replace(" ", "\\ ")
    name2 = name2.replace("(", "\\(")
    name2 = name2.replace(")", "\\)")
    name2 = name2.replace("&", "\\&")
    #print (name2)
    commande = "plutil -convert xml1 "+folder+"/"+name+" -o "+folder+"/"+name2+".xml"
    os.system(commande)
    my_dict = {}
    nb_array = 0
    nb_integer = 0
    nb_real = 0
    nb_string = 0
    flag_ok = 0
    nb_data = 0
    liste_picture = []
    my_timestamp = 0
    my_title = ""
    my_body = ""
    my_id = ""
    my_picture_name = ""
    my_lat = ""
    my_long = ""
    my_address = ""
    for event, elem in ET.iterparse(my_file):
        assert event == 'end'
        if elem.tag == 'array':
            key = elem.text
            nb_array += 1;
        elif elem.tag == 'key':
            value = elem.text
            if (value == 'Version'):
               flag_ok = 1
               #print("ok")
               nb_array = 0
               nb_integer = 0
               nb_real = 0
               nb_string = 0
               nb_data = 0
        elif elem.tag == 'integer':
            nb_integer += 1
            value = elem.text
        elif elem.tag == 'real':
            nb_real += 1
            value = elem.text
            if (flag_ok == 1):
                if (nb_real == 1):
                   my_timestamp = float(value)*1000;
        elif elem.tag == 'string':
            nb_string += 1
            value = elem.text
            if (flag_ok == 1):
                if (nb_string == 2):
                   my_key = value
                   if (value is None):
                      nb_string += 0
                   else:
                      if (value.startswith("Apple")):
                         nb_string += 1
                      elif (value.startswith("Times")):
                         nb_string += 1
                      else:
                         split_list = value.split()
                         if len(split_list) == 3:
                              my_long, my_lat, my_address = value.split("|")
                              print("Address",my_long, my_lat, my_address)
                if (nb_string == 4):
                   my_key = value
                   if (value is None):
                      nb_string += 0
                   else:
                      if (value.startswith("Apple")):
                         nb_string -= 1
                      elif (value.startswith("Times")):
                         nb_string -= 1
                if (nb_string == 5):
                   my_key = value
                   if (value is None):
                      nb_string += 0
                   else:
                      if (value.startswith("Apple")):
                         nb_string -= 2 
                      elif (value.startswith("Times")):
                         nb_string -= 2 
                if (nb_string == 5):
                   my_body = value
                   if (my_body is None):
                       my_body = ""
                   else:
                       my_body = re.sub(r"<@b>", "**", my_body)
                       my_body = re.sub(r"</@b>", "**", my_body)
                       my_body = re.sub(r"<@u>", "*", my_body)
                       my_body = re.sub(r"</@u>", "*", my_body)
                if (nb_string == 6):
                   my_title = value
                   if (my_title is None):
                       my_title = ""
                   elif (len(my_title) == 36 and (' ' not in my_title)):
                       if (len(my_body) > 0):
                           my_title = my_body
                if (nb_string == 7):
                   my_id = value
                   my_id = my_id.replace("-", "")
                   my_id = my_id[0:31]
        elif elem.tag == 'entry':
            my_dict[key] = value
            key = value = None
        elif elem.tag == 'data':
            nb_data += 1
            value = elem.text
            value_clean = re.sub(r" ", "", value)
            value_clean = re.sub(r"\t", "", value_clean)
            value_clean = re.sub(r"\n", "", value_clean)
            nb_picture += 1
            my_picture_name = str(nb_picture)+".jpg"
            liste_picture.append(my_picture_name)
            jpg_recovered = base64.decodestring(value_clean.encode())
            g = open(my_picture_name, "wb")
            g.write(jpg_recovered)
            g.close()
        elem.clear()

    #print("Data:",my_timestamp,",",my_title,",",my_body)
    if (len(my_title) > 150):
       print("Error",my_file);
    print("Filename:",my_file,"Data:",my_timestamp,",",my_title,",",my_id)
    nb_import += 1
    payload_note = {
                "parent_id":Awesome_UID_real,
                "title":my_title,
                "source":my_file,
                "order":nb_import,
                "user_created_time":my_timestamp,
                "user_updated_time":my_timestamp,
                "author":"Awesome Note",
                "body":my_body
}
    if (len(my_address) > 0):
        payload_note_put = {
                "source":my_file,
                "longitude":float(my_long),
                "latitude":float(my_lat),
                "order":nb_import,
                "user_created_time":my_timestamp,
                "user_updated_time":my_timestamp,
                "author":"Awesome Note"
}
    else :
        payload_note_put = {
                "source":my_file,
                "order":nb_import,
                "user_created_time":my_timestamp,
                "user_updated_time":my_timestamp,
                "author":"Awesome Note"
}

    myuid = my_id

    try:
        resp = requests.post(url_notes, json=payload_note)
        resp.raise_for_status()
        resp_dict = resp.json()
        print(resp_dict)
        myuid= resp_dict['id']
        my_id = myuid
    except requests.exceptions.HTTPError as e:
        print("Bad HTTP status code:", e)
        print("payload_note:", payload_note)
    except requests.exceptions.RequestException as e:
        print("Network error:", e)

    url_notes_put = (
    "http://"+ip+":"+port+"/notes/"+myuid+"?"
    "token="+token
)

    try:
        resp = requests.put(url_notes_put, json=payload_note_put)
        resp.raise_for_status()
        resp_dict = resp.json()
        print(resp_dict)
    except requests.exceptions.HTTPError as e:
        print("Bad HTTP status code:", e)
        print("payload_note:", payload_note_put)
    except requests.exceptions.RequestException as e:
        print("Network error:", e)

    for my_picture_name in liste_picture:
               cmd = "curl -F 'data=@"+my_picture_name+"' -F 'props={\"title\":\""+my_picture_name+"\"}' http://"+ip+":"+port+"/resources?token="+token
               print("Command"+cmd)
               resp = os.popen(cmd).read()
               try:
                  respj = json.loads(resp)
                  print(respj['id'])
                  myuid_picture= respj['id']
               except:
                  print('bad json: ', resp)

               my_body = my_body + "\n  ![" + my_picture_name + "](:/" + myuid_picture + ")   \n";

               payload_note_put = {
                "body":my_body
                }

               try:
                  resp = requests.put(url_notes_put, json=payload_note_put)
                  resp.raise_for_status()
                  resp_dict = resp.json()
                  print(resp_dict)
               except requests.exceptions.HTTPError as e:
                  print("Bad HTTP status code:", e)
                  print("payload_note:", payload_note_put)
               except requests.exceptions.RequestException as e:
                  print("Network error:", e)


With this script you have : Title, Body, Picture, Location. ( No tags but it’s possible, and no folder ). It’s impossible to use plistlib …. sniff.