Projeto: ETL com Apache Airflow usando a API de Dados Abertos da Assembleia Legislativa de Minas

Posted by : on

Categoria : python   apache_airflow   sql_server


Projeto: ETL com Apache Airflow usando a API de Dados Abertos da Assembleia Legislativa de Minas

2. Requisitos Funcionais

Coleta de dados

O sistema vai coletar dados do endpoint:

  • https://dadosabertos.almg.gov.br/ws/proposicoes/pesquisa/direcionada?tp=100&formato=json&ano=2024&ord=3&p=50000&ini=20250123&fim=20250126, utilizando os filtros de paginação, data de início e data de fim.
  • https://dadosabertos.almg.gov.br/ws/proposicoes/pesquisa/direcionada?formato=json&ano=2024&num=11063: URL de exemplo para fazer o reprocessamento.

Processamento de dados

O sistema deverá realizar o tratamento de caracteres, como:

  • Remover espaços desnecessários;
  • Remover caracteres de quebra de linha, deixando o texto em uma única linha;
  • Formatar datas corretamente.

Armazenamento de dados

Os dados serão armazenados no banco SQL Server.

3. Requisitos Não Funcionais

  • Execução: O sistema deve rodar com um intervalo de um dia para envio de dados.
  • Monitoramento: O sistema deve incluir logs de execução, registrando históricos e erros para reprocessamento.

4. Estrutura do Banco de Dados

Tabela proposicao

CREATE TABLE proposicao (
    ID INTEGER IDENTITY(1,1) PRIMARY KEY,
    AUTOR NVARCHAR(300),
    DATA_PRESENTACAO DATETIME,
    EMENTA NVARCHAR(MAX),
    REGIME VARCHAR(50),
    SITUACCAO VARCHAR(80),
    TIPO_PROPOSICAO VARCHAR(100),
    NUMERO VARCHAR(40) UNIQUE,
    ANO INTEGER,
    CIDADE VARCHAR(60),
    ESTADO VARCHAR(80),
    DATA_INSERSAO_REGISTRO DATETIME DEFAULT GETDATE(),
    DATA_ATUALIZACAO_REGISTRO DATETIME
);

Dicionário de dados da tabela proposicao

Campo Tipo Descrição
ID Incremental ID automático
AUTOR String Autor da proposição, ex. “Governador Romeu Zema Neto”
DATA_PRESENTACAO Timestamp Data de apresentação da proposição, ex. “2022-10-06T00:00:00Z”
EMENTA String Assunto da proposição, ex. “Encaminha o Projeto de Lei 4008 2022…”
REGIME String Regime de tramitação da proposição, ex. “Especial”
SITUACCAO String Situação atual da proposição, ex. “Publicado”
TIPO_PROPOSICAO String Tipo da proposição, ex. “MSG”
NUMERO String Número da proposição, ex. “300”
ANO Integer Ano da proposição, ex. 2022
CIDADE String Cidade fixa “Belo Horizonte”
ESTADO String Estado fixo “Minas Gerais”
DATA_INSERSAO DATETIME Data de inserção do registro: Padrão: DATA ATUAL
DATA_ATUALIZACAO_REGISTRO DATETIME Data de atualização do registro

Tabela tramitacao

CREATE TABLE tramitacao (
    ID INTEGER IDENTITY(1,1) PRIMARY KEY,
    DESCRICAO VARCHAR(MAX),
    LOCAL_PROPOSICAO VARCHAR(60),
    ID_PROPOSICAO VARCHAR(40),
    DATA_CRIACAO_TRAMITACAO DATETIME,
    DATA_CRIACAO_REGISTRO DATETIME DEFAULT GETDATE(),
    DATA_ATUALIZACAO_REGISTRO DATETIME,
    FOREIGN KEY (ID_PROPOSICAO) REFERENCES proposicao(NUMERO)
);

Dicionário de dados da tabela tramitação

Campo Tipo Descrição
id Incremental ID automático
DATA_CRIACAO_TRAMITACAO Timestamp Data do registro da tramitação, ex. “2022-10-04T00:00:00Z”
DESCRICAO String Descrição do histórico da tramitação, ex. “Proposição lida em Plenário.\nPublicada no DL…”
LOCAL_PROPOSICAO String Local da tramitação, ex. “Plenário”
ID_PROPOSICAO ForeignKey Chave estrangeira que referencia o ID da proposição
DATA_CRIACAO_REGISTRO Timestamp Data da inserção do registro
DATA_ATUALIZACAO_REGISTRO Timestamp Data atualização registro

Tabela log_dag

CREATE TABLE log_dag (
    ID INTEGER IDENTITY(1,1) PRIMARY KEY,
    TIPO_ERROR INTEGER,
    TIPO_LOG VARCHAR(20),
    URL_API VARCHAR(MAX),
    MENSAGEM_LOG VARCHAR(300),
    JSON_XML VARCHAR(MAX),
    JSON_ENVIO VARCHAR(MAX),
    DATA_REGISTRO DATETIME DEFAULT GETDATE()
);

Dicionário de dados da tabela log_dag

CAMPO TIPO DESCRIÇÃO
ID INTEGER ID automático incremental
TIPO_ERROR INTEGER REGISTRA O CÓDIGO TIPO ERROR
TIPO_LOG VARCHAR(20) ERROR, INFO
URL_API VARCHAR(MAX) URL DA API
MENSAGEM_LOG VARCHAR(300) MENSAGEM DE ERRO
JSON_XML VARCHAR(MAX) JSON DA API
JSON_ENVIO VARCHAR(MAX) JSON QUE TENTOU SER GRAVADO NO BANCO
DATA_REGISTRO DATETIME REGISTRA A DATA DE INSERÇÃO DO REGISTRO

Tabela dag_error

CREATE TABLE dag_error (
    ID INTEGER IDENTITY(1,1) PRIMARY KEY,
    TIPO_ERROR INTEGER,
    NUMERO VARCHAR(40),
    URL_API VARCHAR(MAX),
    JSON_XML VARCHAR(MAX),
    JSON_ENVIO VARCHAR(MAX),
    MENSAGEM_ERRO VARCHAR(300),
    DATA_REGISTRO DATETIME DEFAULT GETDATE(),
    DATA_ATUALIZACAO DATETIME
);

Dicionário de dados da tabela dag_error

CAMPO TIPO DESCRIÇÃO
ID INTEGER ID automático incremental
TIPO_ERROR INTEGER REGISTRA O CÓDIGO TIPO ERROR
NUMERO VARCHAR(40) NÚMERO DA PROPOSIÇÃO
URL_API VARCHAR(MAX) URL DA API
JSON_XML VARCHAR(MAX) JSON DA API
JSON_ENVIO VARCHAR(MAX) JSON QUE TENTOU SER GRAVADO NO BANCO
MENSAGEM_ERRO VARCHAR(300) MENSAGEM DE ERRO
DATA_REGISTRO DATETIME DATA DE REGISTRO
DATA_ATUALIZACAO DATETIME DATA DA ATUALIZAÇÃO DO ERRO

5. Fluxo da DAG

Diagrama de Atividade DAG

A DAG inicia verificando a conexão com o banco de dados e a API. Se ambas estiverem ativas, inicia-se o processo ETL para extração e tratamento de dados. Caso algum registro falhe, ele é gravado na tabela dag_error e reprocessado antes da finalização do ETL.

6. Diagrama de Classe

A figura abaixo, mostra uma proposta de diagrama de classe, com destaque para a a conexão da API e banco de dados, podendo ser substituído sem prejuízo ao funcionamento do processo de ETL. DAG


7 - Assistir ao vídeo

Link do reposítório


Sobre Rodrigo Rocha

Hoje trabalho com Pentaho data Integration, SQL (Consulta Simples No Banco de dados) Consumo de API (REST E SOAP)