Skip to content

Commit

Permalink
Merge branch 'main' into staging/br_tse_eleicoes
Browse files Browse the repository at this point in the history
  • Loading branch information
Winzen committed Oct 22, 2024
2 parents 7e0c268 + 10defe7 commit 52b5d1f
Show file tree
Hide file tree
Showing 18 changed files with 520 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
jobs:
build-container:
name: Build Docker image
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cd-staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
build-container:
if: startsWith(github.head_ref, 'staging/')
name: Register flows (staging)
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
jobs:
build-container:
name: Register flows (production)
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
jobs:
lint:
name: Lint dockerfile
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci-python.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ on:
jobs:
lint:
name: Lint python
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Lint source code
uses: chartboost/ruff-action@v1
test:
name: Test python
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -24,7 +24,7 @@ jobs:
with:
cache: poetry
architecture: x64
python-version: '3.11'
python-version: '3.10'
- name: Install requirements
run: poetry install --only=test
- name: Test
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
matrix:
arch: [x64]
os: [ubuntu-latest]
os: [ubuntu-22.04]
python-version: [3.10.x]
steps:
- name: Checkout
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
jobs:
docs:
name: Deploy docs
runs-on: ubuntu-latest
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v4
Expand Down
2 changes: 2 additions & 0 deletions pipelines/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from pipelines.datasets.br_rf_cafir.flows import *
from pipelines.datasets.br_rf_cno.flows import *
from pipelines.datasets.br_rj_isp_estatisticas_seguranca.flows import *
from pipelines.datasets.br_sfb_sicar.flows import *
from pipelines.datasets.br_sp_saopaulo_dieese_icv.flows import *
from pipelines.datasets.br_stf_corte_aberta.flows import *
from pipelines.datasets.br_tse_eleicoes.flows import *
Expand All @@ -59,3 +60,4 @@
from pipelines.datasets.br_ms_sih.flows import *
from pipelines.datasets.br_ms_sinan.flows import *
from pipelines.datasets.br_cgu_emendas_parlamentares.flows import *
from pipelines.datasets.br_cgu_cartao_pagamento.flows import *
32 changes: 32 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from copy import deepcopy, copy
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from pipelines.utils.crawler_cgu.flows import flow_cgu_cartao_pagamento
from pipelines.constants import constants
from pipelines.datasets.br_cgu_cartao_pagamento.schedules import (
every_day_microdados_compras_centralizadas,
every_day_microdados_defesa_civil,
every_day_microdados_governo_federal
)

br_cgu_cartao_pagamento__governo_federal = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__governo_federal.name = "br_cgu_cartao_pagamento.governo_federal"
br_cgu_cartao_pagamento__governo_federal.code_owners = ["trick"]
br_cgu_cartao_pagamento__governo_federal.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__governo_federal.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__governo_federal.schedule = every_day_microdados_governo_federal

br_cgu_cartao_pagamento__defesa_civil = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__defesa_civil.name = "br_cgu_cartao_pagamento.defesa_civil"
br_cgu_cartao_pagamento__defesa_civil.code_owners = ["trick"]
br_cgu_cartao_pagamento__defesa_civil.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__defesa_civil.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__defesa_civil.schedule = every_day_microdados_defesa_civil

br_cgu_cartao_pagamento__compras_centralizadas = deepcopy(flow_cgu_cartao_pagamento)
br_cgu_cartao_pagamento__compras_centralizadas.name = "br_cgu_cartao_pagamento.compras_centralizadas"
br_cgu_cartao_pagamento__compras_centralizadas.code_owners = ["trick"]
br_cgu_cartao_pagamento__compras_centralizadas.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
br_cgu_cartao_pagamento__compras_centralizadas.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
br_cgu_cartao_pagamento__compras_centralizadas.schedule = every_day_microdados_compras_centralizadas
69 changes: 69 additions & 0 deletions pipelines/datasets/br_cgu_cartao_pagamento/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock, IntervalClock
from pipelines.constants import constants
from pipelines.utils.crawler_cgu.constants import constants as constants_cgu

every_day_microdados_governo_federal = Schedule(
clocks=[
CronClock(
cron="0 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_governo_federal",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_defesa_civil = Schedule(
clocks=[
CronClock(
cron="30 20 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_defesa_civil",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)

every_day_microdados_compras_centralizadas = Schedule(
clocks=[
CronClock(
cron="00 21 * * *",
start_date=datetime(2021, 3, 31, 17, 11),
labels=[
constants.BASEDOSDADOS_PROD_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "br_cgu_cartao_pagamento",
"table_id": "microdados_compras_centralizadas",
"materialization_mode": "prod",
"materialize_after_dump": True,
"dbt_alias": True,
"historical_data": False,
"update_metadata": True,
},
),
],
)
Empty file.
Empty file.
35 changes: 35 additions & 0 deletions pipelines/utils/crawler_cgu/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
"""
Constant values for the datasets projects
"""

from enum import Enum
from datetime import datetime

class constants(Enum): # pylint: disable=c0103
"""
Constant values for the br_cgu_cartao_pagamento project
"""

TABELA = {
"microdados_governo_federal" : {
"INPUT_DATA" : "/tmp/input/microdados_governo_federal",
"OUTPUT_DATA" : "/tmp/output/microdados_governo_federal",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpgf/",
"READ" : "_CPGF",
"ONLY_ONE_FILE" : False},

"microdados_compras_centralizadas" : {
"INPUT_DATA" : "/tmp/input/microdados_compras_centralizadas",
"OUTPUT_DATA" : "/tmp/output/microdados_compras_centralizadas",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpcc/",
"READ" : "_CPGFComprasCentralizadas",
"ONLY_ONE_FILE" : False},

"microdados_defesa_civil" : {
"INPUT_DATA" : "/tmp/input/microdados_defesa_civil",
"OUTPUT_DATA" : "/tmp/output/microdados_defesa_civil",
"URL" : "https://portaldatransparencia.gov.br/download-de-dados/cpdc/",
"READ" : "_CPDC",
"ONLY_ONE_FILE" : False}
}
120 changes: 120 additions & 0 deletions pipelines/utils/crawler_cgu/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""
Flows for br_cgu_cartao_pagamento
"""
from datetime import timedelta
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect import Parameter, case
from pipelines.constants import constants
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.decorators import Flow
from pipelines.utils.crawler_cgu.tasks import (
partition_data,
get_current_date_and_download_file,
)
from pipelines.utils.decorators import Flow
from pipelines.utils.execute_dbt_model.constants import constants as dump_db_constants
from pipelines.utils.metadata.tasks import update_django_metadata, check_if_data_is_outdated
from pipelines.utils.tasks import (
create_table_and_upload_to_gcs,
get_current_flow_labels,
rename_current_flow_run_dataset_table,
)

with Flow(
name="CGU - Cartão de Pagamento"
) as flow_cgu_cartao_pagamento:

dataset_id = Parameter("dataset_id", default='br_cgu_cartao_pagamento', required=True)
table_id = Parameter("table_id", default ="microdados_governo_federal", required=True)
####
# Relative_month = 1 means that the data will be downloaded for the current month
####
relative_month = Parameter("relative_month", default=1, required=False)
materialization_mode = Parameter("materialization_mode", default="dev", required=False)
materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
dbt_alias = Parameter("dbt_alias", default=True, required=False)
update_metadata = Parameter("update_metadata", default=False, required=False)
rename_flow_run = rename_current_flow_run_dataset_table(prefix="Dump: ", dataset_id=dataset_id, table_id=table_id, wait=table_id)

data_source_max_date = get_current_date_and_download_file(
table_id,
dataset_id,
relative_month,
)

dados_desatualizados = check_if_data_is_outdated(
dataset_id=dataset_id,
table_id=table_id,
data_source_max_date=data_source_max_date,
date_format="%Y-%m",
upstream_tasks=[data_source_max_date]
)

with case(dados_desatualizados, True):

filepath = partition_data(
table_id=table_id,
upstream_tasks=[dados_desatualizados]
)

wait_upload_table = create_table_and_upload_to_gcs(
data_path=filepath,
dataset_id=dataset_id,
table_id=table_id,
dump_mode="append",
wait=filepath,
upstream_tasks=[filepath],
)

with case(materialize_after_dump, True):

current_flow_labels = get_current_flow_labels()
materialization_flow = create_flow_run(
flow_name=utils_constants.FLOW_EXECUTE_DBT_MODEL_NAME.value,
project_name=constants.PREFECT_DEFAULT_PROJECT.value,
parameters={
"dataset_id": dataset_id,
"table_id": table_id,
"mode": materialization_mode,
"dbt_alias": dbt_alias,
"dbt_command": "run/test",
"disable_elementary": False,
},
labels=current_flow_labels,
run_name=f"Materialize {dataset_id}.{table_id}",
upstream_tasks=[wait_upload_table],
)

wait_for_materialization = wait_for_flow_run(
materialization_flow,
stream_states=True,
stream_logs=True,
raise_final_state=True,
upstream_tasks=[materialization_flow],
)
wait_for_materialization.max_retries = (
dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_ATTEMPTS.value
)
wait_for_materialization.retry_delay = timedelta(
seconds=dump_db_constants.WAIT_FOR_MATERIALIZATION_RETRY_INTERVAL.value
)
with case(update_metadata, True):
update_django_metadata(
dataset_id=dataset_id,
table_id=table_id,
date_column_name={"year": "ano_extrato", "month": "mes_extrato"},
date_format="%Y-%m",
coverage_type="part_bdpro",
time_delta={"months": 6},
prefect_mode=materialization_mode,
bq_project="basedosdados",
upstream_tasks=[wait_for_materialization],
)

flow_cgu_cartao_pagamento.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
flow_cgu_cartao_pagamento.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value
)
Loading

0 comments on commit 52b5d1f

Please sign in to comment.