-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #842 from basedosdados/staging/br_cgu_emenda_parla…
…mentar [dados] br_cgu_emenda_parlamentar
- Loading branch information
Showing
5 changed files
with
213 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
17 changes: 17 additions & 0 deletions
17
pipelines/datasets/br_cgu_emendas_parlamentares/constants.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# -*- coding: utf-8 -*- | ||
from enum import Enum | ||
|
||
import numpy as np | ||
|
||
|
||
class constants(Enum): # pylint: disable=c0103 | ||
URL = "https://portaldatransparencia.gov.br/download-de-dados/emendas-parlamentares/UNICO" | ||
INPUT = "/tmp/input/" | ||
OUTPUT = "/tmp/output/" | ||
VALUES_FLOAT = ['Valor Empenhado', 'Valor Liquidado', 'Valor Pago', 'Valor Restos A Pagar Inscritos', 'Valor Restos A Pagar Cancelados', 'Valor Restos A Pagar Pagos'] | ||
QUERY = """SELECT | ||
(SELECT count(*) as total FROM `basedosdados.br_cgu_emendas_parlamentares.microdados`) AS total, | ||
(SELECT TIMESTAMP_MILLIS(creation_time) as last_modified_time | ||
FROM `basedosdados.br_cgu_emendas_parlamentares.__TABLES_SUMMARY__` | ||
WHERE table_id = 'microdados') AS last_modified_time; | ||
""" |
110 changes: 110 additions & 0 deletions
110
pipelines/datasets/br_cgu_emendas_parlamentares/flows.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
# -*- coding: utf-8 -*- | ||
from datetime import timedelta | ||
from prefect import Parameter, case | ||
from prefect.run_configs import KubernetesRun | ||
from prefect.storage import GCS | ||
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run | ||
from pipelines.constants import constants | ||
from pipelines.datasets.br_cgu_emendas_parlamentares.schedules import ( | ||
every_day_emendas_parlamentares | ||
) | ||
from pipelines.datasets.br_cgu_emendas_parlamentares.tasks import ( | ||
convert_str_to_float, | ||
get_last_modified_time | ||
) | ||
from pipelines.utils.constants import constants as utils_constants | ||
from pipelines.utils.decorators import Flow | ||
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants | ||
from pipelines.utils.metadata.tasks import ( | ||
update_django_metadata, | ||
check_if_data_is_outdated, | ||
) | ||
from pipelines.utils.tasks import ( # update_django_metadata, | ||
create_table_and_upload_to_gcs, | ||
get_current_flow_labels, | ||
rename_current_flow_run_dataset_table, | ||
) | ||
|
||
|
||
with Flow( | ||
name="br_cgu_emendas_parlamentares.microdados", | ||
code_owners=[ | ||
"tricktx", | ||
], | ||
) as br_cgu_emendas_parlamentares_flow: | ||
dataset_id = Parameter("dataset_id", default="br_cgu_emendas_parlamentares", required=False) | ||
table_id = Parameter("table_id", default="microdados", required=False) | ||
update_metadata = Parameter("update_metadata", default=False, required=False) | ||
materialization_mode = Parameter("materialization_mode", default="dev", required=False) | ||
materialize_after_dump = Parameter("materialize_after_dump", default=False, required=False) | ||
dbt_alias = Parameter("dbt_alias", default=True, required=False) | ||
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id) | ||
|
||
max_modified_time = get_last_modified_time() | ||
|
||
outdated = check_if_data_is_outdated( | ||
dataset_id=dataset_id, | ||
table_id=table_id, | ||
date_type="last_update_date", | ||
data_source_max_date=max_modified_time, | ||
upstream_tasks=[max_modified_time], | ||
) | ||
|
||
with case(outdated, True): | ||
output_path = convert_str_to_float() | ||
wait_upload_table = create_table_and_upload_to_gcs( | ||
data_path=output_path, | ||
dataset_id=dataset_id, | ||
table_id=table_id, | ||
dump_mode="append", | ||
wait=output_path, | ||
upstream_tasks=[output_path], | ||
) | ||
|
||
|
||
with case(materialize_after_dump, True): | ||
# Trigger DBT flow run | ||
current_flow_labels = get_current_flow_labels() | ||
materialization_flow = create_flow_run( | ||
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value, | ||
project_name=constants.PREFECT_DEFAULT_PROJECT.value, | ||
parameters={ | ||
"dataset_id": dataset_id, | ||
"table_id": table_id, | ||
"mode": materialization_mode, | ||
"dbt_alias": dbt_alias, | ||
"dbt_command": "run/test", | ||
"disable_elementary": False, | ||
}, | ||
labels=current_flow_labels, | ||
run_name=f"Materialize {dataset_id}.{table_id}", | ||
upstream_tasks = [wait_upload_table] | ||
) | ||
|
||
wait_for_materialization = wait_for_flow_run( | ||
materialization_flow, | ||
stream_states=True, | ||
stream_logs=True, | ||
raise_final_state=True, | ||
) | ||
wait_for_materialization.max_retries = ( | ||
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value | ||
) | ||
wait_for_materialization.retry_delay = timedelta( | ||
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value | ||
) | ||
with case(update_metadata, True): | ||
update_django_metadata( | ||
dataset_id=dataset_id, | ||
table_id=table_id, | ||
date_format="%Y", | ||
coverage_type="all_bdpro", | ||
prefect_mode=materialization_mode, | ||
historical_database=False, | ||
bq_project="basedosdados", | ||
upstream_tasks=[wait_for_materialization], | ||
) | ||
|
||
br_cgu_emendas_parlamentares_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value) | ||
br_cgu_emendas_parlamentares_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value) | ||
br_cgu_emendas_parlamentares_flow.schedule = every_day_emendas_parlamentares |
27 changes: 27 additions & 0 deletions
27
pipelines/datasets/br_cgu_emendas_parlamentares/schedules.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# -*- coding: utf-8 -*- | ||
from datetime import datetime | ||
|
||
from prefect.schedules import Schedule | ||
from prefect.schedules.clocks import CronClock, IntervalClock | ||
|
||
from pipelines.constants import constants | ||
|
||
every_day_emendas_parlamentares = Schedule( | ||
clocks=[ | ||
CronClock( | ||
cron="30 19 * * *", # At 19:30 every day | ||
start_date=datetime(2021, 3, 31, 17, 11), | ||
labels=[ | ||
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value, | ||
], | ||
parameter_defaults={ | ||
"dataset_id": "br_cgu_emenda_parlamentar", | ||
"table_id": "emenda_parlamentar", | ||
"materialization_mode": "prod", | ||
"materialize_after_dump": True, | ||
"dbt_alias": True, | ||
"update_metadata": True, | ||
}, | ||
), | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# -*- coding: utf-8 -*- | ||
from urllib.request import urlopen | ||
from datetime import datetime | ||
import zipfile | ||
import basedosdados as bd | ||
import pandas as pd | ||
from io import BytesIO | ||
from pipelines.utils.utils import log | ||
from pipelines.datasets.br_cgu_emendas_parlamentares.constants import constants | ||
from prefect import task | ||
import os | ||
|
||
def download_unzip_file(): | ||
if not os.path.exists(constants.INPUT.value): | ||
os.mkdir(constants.INPUT.value) | ||
try: | ||
r = urlopen(constants.URL.value) | ||
zip = zipfile.ZipFile(BytesIO(r.read())) | ||
zip.extractall(path=constants.INPUT.value) | ||
except Exception as e: | ||
print(e) | ||
log("Erro ao baixar e descompactar arquivo") | ||
|
||
@task | ||
def convert_str_to_float(): | ||
df = pd.read_csv(f"{constants.INPUT.value}Emendas.csv", sep=';', encoding='latin1') | ||
log("Convertendo valores para float") | ||
|
||
for _ in constants.VALUES_FLOAT.value: | ||
df[_] = df[_].str.replace(',', '.').astype(float) | ||
|
||
output = f"{constants.OUTPUT.value}microdados.csv" | ||
|
||
if not os.path.exists(constants.OUTPUT.value): | ||
os.mkdir(constants.OUTPUT.value) | ||
|
||
df.to_csv(output, sep=',', encoding='utf-8', index=False) | ||
log("---------------- Tabela salva -------------------") | ||
return constants.OUTPUT.value | ||
|
||
@task | ||
def get_last_modified_time(): | ||
download_unzip_file() | ||
emendas = pd.read_csv(f"{constants.INPUT.value}Emendas.csv", sep=';', encoding='latin1') | ||
|
||
data = bd.read_sql( | ||
constants.QUERY.value, | ||
billing_project_id="basedosdados", | ||
from_file=True ) | ||
|
||
date = data.iloc[0].values | ||
log("Data da última atualização: " + str(date[1])) | ||
log("Quantidade de linhas na tabela: " + str(date[0])) | ||
log("Quantidade de linhas no arquivo: " + str(emendas.shape[0])) | ||
if emendas.shape[0] > date[0]: | ||
return datetime.today() | ||
|
||
return date[1] |