Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

WIP : Jimmyd/impact retour emploi #384

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
841a83d
WIP Add Joris work about impact retour emploi
JimmyDore Jun 27, 2019
c51259f
Update setup.py to create executables in virtualenv for ire scripts
JimmyDore Jun 28, 2019
960aebf
wip
JimmyDore Jul 3, 2019
389034d
Industrialize daily copy script
JimmyDore Jul 5, 2019
a5f6d0f
Fix scripts launcher
JimmyDore Jul 5, 2019
5589f81
Add logs informations
JimmyDore Jul 5, 2019
7923a64
Add Exception for the daily parser script
JimmyDore Jul 5, 2019
f6f36b2
Clean and prepare jobs join & clean activity_logs-dpae for Jenkins
JimmyDore Jul 9, 2019
04902f2
Remove debug mode
JimmyDore Jul 9, 2019
d4e8756
Add log about size of DPAE file
JimmyDore Jul 9, 2019
c333095
wip make report
JimmyDore Jul 10, 2019
22aaf5f
Fix (approximately) issues with path
JimmyDore Jul 10, 2019
adfbcb1
Fix last problem with path
JimmyDore Jul 10, 2019
b3693ce
Add settings file with different paths
JimmyDore Jul 10, 2019
17e4c6f
Fix import module charts
JimmyDore Jul 10, 2019
0292e93
Add useful libs to install in DockerFile
JimmyDore Jul 11, 2019
a503443
Add xvfb to run imgkit from Docker image
JimmyDore Jul 11, 2019
bf21e56
Add comments on main script to make charts and excel report
JimmyDore Jul 12, 2019
5d439e3
Update name of DPAE file to be used
JimmyDore Sep 23, 2019
6ff55df
Add function to parse activity logs for PSE study
JimmyDore Nov 13, 2019
571b82f
Update the way to check if a file needs to be used or not
JimmyDore Nov 20, 2019
5299955
Add option to join data on SIREN (or SIRET as before)
JimmyDore Dec 18, 2019
98a068b
Remove debug mode
JimmyDore Dec 18, 2019
cd71c44
Fix import
JimmyDore Dec 18, 2019
d2e40a3
Fix check existence of csv generated file
JimmyDore Dec 18, 2019
a68b5ed
Fix SIREN issue int/str
JimmyDore Dec 24, 2019
5ab18af
Fix types of columns siren/siret
JimmyDore Dec 26, 2019
e9c9653
Fix pandas bug
JimmyDore Dec 26, 2019
ccf6a21
Try with SIRET to compare data
JimmyDore Dec 26, 2019
fc35a7a
Fix path to dpae file
JimmyDore Dec 26, 2019
43d3b82
Fix siren bug
Feb 18, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ RUN apt update && \
libssl-dev \
# scipy
gfortran libblas-dev liblapack-dev libatlas-base-dev \
# impact retour emploi
wkhtmltopdf python3-cairo xvfb\
&& pip3 install virtualenv

# Install python requirements
Expand All @@ -38,4 +40,4 @@ RUN ../env/bin/flask assets build

# Run uwsgi
EXPOSE 8000
CMD ["../env/bin/uwsgi", "./docker/uwsgi.ini"]
CMD ["../env/bin/uwsgi", "./docker/uwsgi.ini"]
22 changes: 21 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,26 @@ alembic-generate-migration:
@echo
@echo " $$ alembic revision -m 'create account table'"

# Impact retour à l'emploi
# ------------------------
daily-json-activity-parser:
export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/daily_json_activity_parser.py

join_activity_logs_and_dpae:
export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/join_activity_logs_dpae.py

clean_activity_logs_and_dpae:
export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/clean_activity_logs_dpae.py

make_report:
export LBB_ENV=development && cd $(PACKAGE_DIR) && python scripts/impact_retour_emploi/make_report.py

run_ire_jobs:
make join_activity_logs_and_dpae && \
make clean_activity_logs_and_dpae && \
make make_report && \
echo "The new report has been built successfully."

# Importer jobs
# -------------

Expand Down Expand Up @@ -322,4 +342,4 @@ clean-car-isochrone-and-durations-cache: clean-car-isochrone-cache

delete-unused-redis-containers:
docker ps -f status=restarting -f name=redis --format "{{.ID}}" \
| xargs docker rm -f
| xargs docker rm -f
10 changes: 10 additions & 0 deletions labonneboite/importer/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from datetime import datetime
import logging
from functools import lru_cache
import urllib

import MySQLdb as mdb
from sqlalchemy import create_engine

from labonneboite.common import departements as dpt
from labonneboite.common.util import timeit
Expand Down Expand Up @@ -38,6 +40,14 @@ def create_cursor():
cur = con.cursor()
return con, cur

def create_sqlalchemy_engine():
connexion_url = ('mysql://'+DATABASE['USER']+':%s@'+\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Je trouve que ce serait plus lisible d'enlever les "+" et de passer par format. Exemple : f"mysql://{DATABASE['USER']}:%s@".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL nouvelle syntaxe, je connaissais seulement '{} {}'.format('one', 'two')

DATABASE['HOST']+':'+str(DATABASE['PORT'])+\
'/'+DATABASE['NAME']) % urllib.parse.quote_plus(DATABASE['PASSWORD'])

engine = create_engine(connexion_url)

return engine.connect()

def check_for_updates(input_folder):
"""
Expand Down
183 changes: 183 additions & 0 deletions labonneboite/scripts/impact_retour_emploi/clean_activity_logs_dpae.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from datetime import date
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Une docstring générale ne ferait pas de mal pour expliquer ce que fait le script et comment l'utiliser, toujours dans l'optique de la MCO.

import time
import pandas as pd
from labonneboite.importer import util as import_util
from labonneboite.importer import settings as importer_settings
from labonneboite.importer.jobs.common import logger
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ce serait bien de différencier les librairies natives des externes et de celles du projet par un saut de ligne.

from labonneboite.scripts.impact_retour_emploi.settings_path_charts import DEBUG, JOIN_ON_SIREN


def clean_csv_act_dpae_file():

dpae_folder_path = importer_settings.INPUT_SOURCE_FOLDER + '/'

#FIXME : trouver le fichier généré d'une autre manière
timestr = time.strftime("%Y-%m-%d")
csv_path = f"{dpae_folder_path}act_dpae-{timestr}.csv"

df_dpae_act = pd.read_csv(csv_path,
sep='|',
header=0)
if JOIN_ON_SIREN:
df_dpae_act.siren = df_dpae_act.siren.astype(str)

df_dpae_act.siret = df_dpae_act.siret.astype(str)

logger.info("The .csv file generated to clean has {} rows".format(
df_dpae_act.shape[0]))

df_dpae_act = df_dpae_act[df_dpae_act.premiere_embauche == 'Embauche']
logger.info(
"The .csv file - rows with not 'premiere embauche' has {} rows".format(df_dpae_act.shape[0]))

# remove duplicates when multiple activities for the same dpae
df_dpae_act = df_dpae_act.sort_values('dateheure')

if JOIN_ON_SIREN:
column_join_etab = 'siren'
else:
column_join_etab = 'siret'

df_dpae_act = df_dpae_act.drop_duplicates(
subset=['idutilisateur-peconnect', column_join_etab], keep='first')
logger.info(
"The .csv file - duplicates has {} rows ".format(df_dpae_act.shape[0]))

# rename some columns
df_dpae_act.rename(columns={'dateheure': 'date_activite',
'kd_dateembauche': 'date_embauche',
'nbrjourtravaille': 'duree_activite_cdd_jours',
'kn_trancheage': 'tranche_age',
'duree_pec': 'duree_prise_en_charge',
'dc_commune_id': 'code_postal'
},
inplace=True)

def get_type_contrat(row):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Est-ce que ce ne serait pas plus logique que les fonctions soient réunies au début du script, avant la fonction principale appelée par run_main ? Là ça mélange tout.
D'ailleurs j'ai un peu de mal à saisir pourquoi plusieurs fonctions sont dans "clean_csv_act_dpae_file" alors qu'elles pourraient être définies précédemment et être ainsi indépendantes. Ca aiderait grandement à les tester en tests unitaires.

if row['dc_typecontrat_id'] == 1:
return 'CDD'
elif row['dc_typecontrat_id'] == 2:
return 'CDI'
return 'CTT'
df_dpae_act['type_contrat'] = df_dpae_act.apply(
lambda row: get_type_contrat(row), axis=1)

def get_nb_mois(row):
return row['duree_activite_cdd_jours'] // 30
df_dpae_act['duree_activite_cdd_mois'] = df_dpae_act.apply(
lambda row: get_nb_mois(row), axis=1)

def get_nbr_jours_act_emb(row):
de = row['date_embauche'][:10].split('-')
da = row['date_activite'][:10].split('-')
f_date = date(int(da[0]), int(da[1]), int(da[2]))
l_date = date(int(de[0]), int(de[1]), int(de[2]))
delta = l_date - f_date
return delta.days
df_dpae_act['diff_activite_embauche_jrs'] = df_dpae_act.apply(
lambda row: get_nbr_jours_act_emb(row), axis=1)

def get_priv_pub(row):
if row['dc_privepublic'] == 0:
return 'Public'
return 'Prive'
df_dpae_act['dc_privepublic'] = df_dpae_act.apply(
lambda row: get_priv_pub(row), axis=1)

def good_format(row):
return row['date_embauche'][:-2]
df_dpae_act['date_embauche'] = df_dpae_act.apply(
lambda row: good_format(row), axis=1)

def del_interrogation(row):
if row['tranche_age'] == 'de 26 ans ? 50 ans':
return 'entre 26 et 50 ans'
return row['tranche_age']
df_dpae_act['tranche_age'] = df_dpae_act.apply(
lambda row: del_interrogation(row), axis=1)

def del_cdd_incoherent(row):
try:
if int(row['duree_activite_cdd_jours']) > 1200:
return 1
return 0
except:
return 0
df_dpae_act['temporaire'] = df_dpae_act.apply(
lambda row: del_cdd_incoherent(row), axis=1)
df_dpae_act = df_dpae_act[df_dpae_act.temporaire == 0]
logger.info(
"The .csv file - contrats which last too long to be legal has {} rows".format(df_dpae_act.shape[0]))

# We only have activities in august for 31/08/2018 --> ugly charts, we want to start from the 1st september
df_dpae_act = df_dpae_act[df_dpae_act.date_activite > "2018-08-31"]
logger.info(
"The .csv file - activity with date = 31/08/2018 has {} rows".format(df_dpae_act.shape[0]))

cols_of_interest = ['idutilisateur_peconnect',
'siret',
'date_activite',
'date_embauche',
'type_contrat',
'duree_activite_cdd_mois',
'duree_activite_cdd_jours',
'diff_activite_embauche_jrs',
'dc_lblprioritede',
'tranche_age',
'dc_privepublic',
'duree_prise_en_charge',
'dn_tailleetablissement',
'code_postal']

if JOIN_ON_SIREN:
cols_of_interest.append('siren')

df_dpae_act = df_dpae_act[cols_of_interest]

engine = import_util.create_sqlalchemy_engine()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A mon avis, la partie précédente (nettoyage et formatage des données) devrait être séparée de la partie suivante (introduction des nouvelles données dans la base). Ce serait plus facile à tester en tests unitaires et ça permettrait de déboguer plus facilement en cas de souci.


table_name_act_dpae = 'act_dpae_clean_siren' if JOIN_ON_SIREN is True else 'act_dpae_clean'
query = f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name_act_dpae}'"

existing_sql_table = False
if engine.execute(query).fetchone()[0] == 1: # Table existe
existing_sql_table = True

if existing_sql_table:

query = f"select * from {table_name_act_dpae}"
df_dpae_act_existing = pd.read_sql_query(query, engine)

if JOIN_ON_SIREN:
df_dpae_act_existing = df_dpae_act_existing.siren.astype(str)
else:
df_dpae_act_existing = df_dpae_act_existing.siret.astype(str)

# In case a problem appear in the script, we save old datas under .csv extension
# because we will rewrite the whole table after each execution, we have to remove duplicates

df_dpae_act_existing.to_csv(
f"{dpae_folder_path}backup_sql_{table_name_act_dpae}", encoding='utf-8', sep='|')
logger.info(
"There were already act/dpae : {} rows".format(df_dpae_act_existing.shape[0]))
df_dpae_act = pd.concat([df_dpae_act, df_dpae_act_existing])
logger.info("Concatenation of both has {} rows".format(
df_dpae_act.shape[0]))

df_dpae_act = df_dpae_act.drop_duplicates(
subset=['idutilisateur_peconnect', column_join_etab], keep='first')
logger.info(
"Concatenation of both - duplicates has {} rows".format(df_dpae_act.shape[0]))

df_dpae_act.to_sql(con=engine, name=table_name_act_dpae,
if_exists='replace', index=False, chunksize=10000)

engine.close()


def run_main():
clean_csv_act_dpae_file()


if __name__ == '__main__':
run_main()
Loading