Logo

Exemplos em Python

Implementações práticas e prontas para usar com Python

Esta página contém exemplos práticos de como integrar com a API Sinapse usando Python.

Instalação

pip install requests python-dotenv

Cliente Base

Primeiro, vamos criar uma classe base para facilitar todas as operações:

import requests
from typing import Dict, List, Optional, Any
import os
from dotenv import load_dotenv

load_dotenv()

class SinapseClient:
    """Cliente para interagir com a API Sinapse"""
    
    def __init__(self, base_url: str = None, token: str = None):
        self.base_url = base_url or os.getenv('SINAPSE_API_URL', 'https://api.sinapse.org.br/v1')
        self.token = token
        self.session = requests.Session()
        
        if self.token:
            self.session.headers.update({
                'Authorization': f'Bearer {self.token}'
            })
    
    def login(self, email: str, senha: str) -> Dict[str, Any]:
        """Realiza login e armazena o token"""
        response = self.session.post(
            f"{self.base_url}/auth/login",
            json={"email": email, "senha": senha}
        )
        
        if response.status_code == 200:
            data = response.json()
            self.token = data['access_token']
            self.session.headers.update({
                'Authorization': f'Bearer {self.token}'
            })
            return data
        else:
            raise Exception(f"Erro no login: {response.status_code} - {response.text}")
    
    def get(self, endpoint: str, params: Dict = None) -> Any:
        """Realiza requisição GET"""
        response = self.session.get(f"{self.base_url}{endpoint}", params=params)
        response.raise_for_status()
        return response.json()
    
    def post(self, endpoint: str, data: Dict = None) -> Any:
        """Realiza requisição POST"""
        response = self.session.post(f"{self.base_url}{endpoint}", json=data)
        response.raise_for_status()
        return response.json()
    
    def put(self, endpoint: str, data: Dict = None) -> Any:
        """Realiza requisição PUT"""
        response = self.session.put(f"{self.base_url}{endpoint}", json=data)
        response.raise_for_status()
        return response.json()
    
    def delete(self, endpoint: str) -> Any:
        """Realiza requisição DELETE"""
        response = self.session.delete(f"{self.base_url}{endpoint}")
        response.raise_for_status()
        return response.json() if response.content else None

Autenticação e Gestão de Sessão

Login com Renovação Automática

import time
from datetime import datetime, timedelta
import jwt

class SinapseAuthClient(SinapseClient):
    """Cliente com gerenciamento automático de tokens"""
    
    def __init__(self, email: str, senha: str):
        super().__init__()
        self.email = email
        self.senha = senha
        self.refresh_token = None
        self.token_expires_at = None
        self.login(email, senha)
    
    def login(self, email: str, senha: str) -> Dict[str, Any]:
        """Login com armazenamento de refresh token"""
        data = super().login(email, senha)
        self.refresh_token = data['refresh_token']
        
        # Decodificar token para pegar expiração
        decoded = jwt.decode(self.token, options={"verify_signature": False})
        self.token_expires_at = datetime.fromtimestamp(decoded['exp'])
        
        return data
    
    def _check_token(self):
        """Verifica e renova token se necessário"""
        if datetime.now() >= self.token_expires_at - timedelta(minutes=5):
            self._refresh_token()
    
    def _refresh_token(self):
        """Renova o token de acesso"""
        response = self.session.post(
            f"{self.base_url}/auth/refresh",
            json={"refresh_token": self.refresh_token}
        )
        
        if response.status_code == 200:
            data = response.json()
            self.token = data['access_token']
            self.session.headers.update({
                'Authorization': f'Bearer {self.token}'
            })
            
            decoded = jwt.decode(self.token, options={"verify_signature": False})
            self.token_expires_at = datetime.fromtimestamp(decoded['exp'])
        else:
            # Se refresh falhar, fazer login novamente
            self.login(self.email, self.senha)
    
    def get(self, endpoint: str, params: Dict = None) -> Any:
        """GET com verificação de token"""
        self._check_token()
        return super().get(endpoint, params)
    
    def post(self, endpoint: str, data: Dict = None) -> Any:
        """POST com verificação de token"""
        self._check_token()
        return super().post(endpoint, data)

# Uso
client = SinapseAuthClient("[email protected]", "SenhaForte123!")
perfil = client.get("/usuarios/me")
print(f"Logado como: {perfil['nome']}")

Gestão de Usuários

CRUD Completo de Usuários

class GestorUsuarios:
    """Gerenciador de usuários Sinapse"""
    
    def __init__(self, client: SinapseClient):
        self.client = client
    
    def criar_usuario(self, dados_usuario: Dict) -> Dict:
        """Cria novo usuário"""
        # Validar dados obrigatórios
        campos_obrigatorios = ['email', 'senha', 'nome', 'cpf']
        for campo in campos_obrigatorios:
            if campo not in dados_usuario:
                raise ValueError(f"Campo obrigatório ausente: {campo}")
        
        # Validar CPF
        if not self._validar_cpf(dados_usuario['cpf']):
            raise ValueError("CPF inválido")
        
        # Validar senha
        if not self._validar_senha(dados_usuario['senha']):
            raise ValueError("Senha não atende aos requisitos mínimos")
        
        return self.client.post("/usuarios", dados_usuario)
    
    def buscar_usuarios(self, filtros: Dict = None) -> List[Dict]:
        """Busca usuários com filtros"""
        params = filtros or {}
        params.setdefault('page', 1)
        params.setdefault('size', 50)
        
        todos_usuarios = []
        
        while True:
            response = self.client.get("/usuarios", params)
            todos_usuarios.extend(response['items'])
            
            if params['page'] >= response['pages']:
                break
            
            params['page'] += 1
        
        return todos_usuarios
    
    def atualizar_usuario(self, user_id: int, dados: Dict) -> Dict:
        """Atualiza dados do usuário"""
        return self.client.put(f"/usuarios/{user_id}", dados)
    
    def desativar_usuario(self, user_id: int, motivo: str) -> bool:
        """Desativa um usuário"""
        return self.client.delete(f"/usuarios/{user_id}?motivo={motivo}")
    
    def atribuir_grupo(self, user_id: int, grupo_id: int) -> bool:
        """Adiciona usuário a um grupo"""
        return self.client.post(f"/usuarios/{user_id}/grupos/{grupo_id}")
    
    def _validar_cpf(self, cpf: str) -> bool:
        """Valida CPF brasileiro"""
        cpf = ''.join(filter(str.isdigit, cpf))
        
        if len(cpf) != 11:
            return False
        
        # Validação do dígito verificador
        for i in range(9, 11):
            value = sum((int(cpf[num]) * ((i+1) - num) for num in range(0, i)))
            digit = ((value * 10) % 11) % 10
            if digit != int(cpf[i]):
                return False
        
        return True
    
    def _validar_senha(self, senha: str) -> bool:
        """Valida requisitos de senha"""
        import re
        
        if len(senha) < 8:
            return False
        if not re.search(r"[A-Z]", senha):
            return False
        if not re.search(r"[a-z]", senha):
            return False
        if not re.search(r"[0-9]", senha):
            return False
        if not re.search(r"[^A-Za-z0-9]", senha):
            return False
        
        return True

# Exemplo de uso
gestor = GestorUsuarios(client)

# Criar usuário
novo_usuario = gestor.criar_usuario({
    "email": "[email protected]",
    "senha": "SenhaForte123!",
    "nome": "Novo Usuário",
    "cpf": "123.456.789-00",
    "telefone": "(11) 98765-4321",
    "cargo": "Analista"
})

# Buscar usuários ativos em SP
usuarios_sp = gestor.buscar_usuarios({
    "estado": "SP",
    "ativo": True,
    "size": 100
})

print(f"Encontrados {len(usuarios_sp)} usuários ativos em SP")

Análise de Agravos

Dashboard de Monitoramento

import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Tuple

class MonitorAgravos:
    """Monitor de agravos epidemiológicos"""
    
    def __init__(self, client: SinapseClient):
        self.client = client
    
    def analisar_surto(self, 
                      tipo_agravo: str, 
                      municipio: str,
                      dias: int = 30) -> Dict:
        """Analisa possível surto em município"""
        
        # Buscar casos
        data_fim = datetime.now()
        data_inicio = data_fim - timedelta(days=dias)
        
        casos = self.client.get("/agravos/casos", {
            "agravo_codigo": tipo_agravo,
            "municipio_residencia": municipio,
            "data_inicio": data_inicio.isoformat(),
            "data_fim": data_fim.isoformat(),
            "size": 1000
        })
        
        # Converter para DataFrame
        df = pd.DataFrame(casos['items'])
        
        if df.empty:
            return {"alerta": False, "mensagem": "Sem casos no período"}
        
        # Análise temporal
        df['data_notificacao'] = pd.to_datetime(df['data_notificacao'])
        df['semana'] = df['data_notificacao'].dt.isocalendar().week
        
        # Casos por semana
        casos_semanais = df.groupby('semana').size()
        
        # Calcular tendência
        if len(casos_semanais) >= 4:
            # Média móvel
            media_movel = casos_semanais.rolling(window=2).mean()
            
            # Taxa de crescimento
            crescimento = (casos_semanais.iloc[-1] / casos_semanais.iloc[-2] - 1) * 100
            
            # Critérios de alerta
            alerta = (
                crescimento > 50 or  # Crescimento > 50%
                casos_semanais.iloc[-1] > casos_semanais.mean() * 2  # Acima de 2x a média
            )
        else:
            alerta = False
            crescimento = 0
        
        # Análise espacial
        bairros_afetados = df['bairro'].value_counts()
        
        return {
            "alerta": alerta,
            "total_casos": len(df),
            "casos_ultima_semana": int(casos_semanais.iloc[-1]) if not casos_semanais.empty else 0,
            "crescimento_percentual": round(crescimento, 1),
            "bairros_mais_afetados": bairros_afetados.head(5).to_dict(),
            "classificacao_final": df['classificacao_final'].value_counts().to_dict(),
            "serie_temporal": casos_semanais.to_dict()
        }
    
    def gerar_relatorio_estado(self, estado: str, tipo_agravo: str = "dengue") -> pd.DataFrame:
        """Gera relatório consolidado por estado"""
        
        # Buscar estatísticas
        stats = self.client.get("/agravos/casos/estatisticas", {
            "estado": estado,
            "agravo_codigo": tipo_agravo,
            "agregacao": "municipio",
            "periodo": "mes_atual"
        })
        
        # Criar DataFrame
        df_municipios = pd.DataFrame(stats['por_municipio'])
        
        # Calcular métricas
        df_municipios['casos_por_100k'] = (
            df_municipios['total_casos'] / df_municipios['populacao'] * 100000
        )
        
        df_municipios['risco'] = pd.cut(
            df_municipios['casos_por_100k'],
            bins=[0, 50, 100, 300, float('inf')],
            labels=['Baixo', 'Médio', 'Alto', 'Muito Alto']
        )
        
        # Ordenar por risco
        df_municipios = df_municipios.sort_values('casos_por_100k', ascending=False)
        
        return df_municipios
    
    def alertas_automaticos(self, configuracoes: List[Dict]) -> List[Dict]:
        """Verifica alertas configurados"""
        
        alertas_disparados = []
        
        for config in configuracoes:
            resultado = self.analisar_surto(
                tipo_agravo=config['tipo_agravo'],
                municipio=config['municipio'],
                dias=config.get('dias', 30)
            )
            
            if resultado['alerta']:
                alerta = {
                    "timestamp": datetime.now().isoformat(),
                    "municipio": config['municipio'],
                    "tipo_agravo": config['tipo_agravo'],
                    "mensagem": f"Possível surto detectado: {resultado['crescimento_percentual']}% de crescimento",
                    "dados": resultado
                }
                
                alertas_disparados.append(alerta)
                
                # Enviar notificação (webhook, email, etc)
                if 'webhook_url' in config:
                    self._enviar_webhook(config['webhook_url'], alerta)
        
        return alertas_disparados
    
    def _enviar_webhook(self, url: str, dados: Dict):
        """Envia alerta via webhook"""
        try:
            response = requests.post(url, json=dados, timeout=10)
            response.raise_for_status()
        except Exception as e:
            print(f"Erro ao enviar webhook: {e}")

# Exemplo de uso
monitor = MonitorAgravos(client)

# Analisar surto
analise = monitor.analisar_surto("dengue", "São Paulo", dias=30)
if analise['alerta']:
    print(f"ALERTA: Crescimento de {analise['crescimento_percentual']}%")
    print(f"Total de casos: {analise['total_casos']}")
    print(f"Bairros mais afetados: {list(analise['bairros_mais_afetados'].keys())[:3]}")

# Relatório estadual
df_relatorio = monitor.gerar_relatorio_estado("SP", "dengue")
print("\nMunicípios de maior risco:")
print(df_relatorio[['municipio', 'total_casos', 'casos_por_100k', 'risco']].head(10))

Sincronização de Dados

Importação em Lote

import csv
from concurrent.futures import ThreadPoolExecutor
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ImportadorDados:
    """Importador de dados em lote para Sinapse"""
    
    def __init__(self, client: SinapseClient, max_workers: int = 5):
        self.client = client
        self.max_workers = max_workers
    
    def importar_casos_csv(self, arquivo_csv: str) -> Dict:
        """Importa casos de agravo de arquivo CSV"""
        
        casos_para_importar = []
        
        # Ler CSV
        with open(arquivo_csv, 'r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            
            for row in reader:
                caso = self._mapear_caso_csv(row)
                if caso:
                    casos_para_importar.append(caso)
        
        logger.info(f"Lidos {len(casos_para_importar)} casos do CSV")
        
        # Importar em paralelo
        resultados = {
            "sucesso": 0,
            "erros": [],
            "duplicados": 0
        }
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            
            for lote in self._dividir_em_lotes(casos_para_importar, 100):
                future = executor.submit(self._importar_lote, lote)
                futures.append(future)
            
            for future in futures:
                resultado_lote = future.result()
                resultados["sucesso"] += resultado_lote["sucesso"]
                resultados["erros"].extend(resultado_lote["erros"])
                resultados["duplicados"] += resultado_lote["duplicados"]
        
        logger.info(f"Importação concluída: {resultados['sucesso']} sucessos, "
                   f"{len(resultados['erros'])} erros, {resultados['duplicados']} duplicados")
        
        return resultados
    
    def _mapear_caso_csv(self, row: Dict) -> Optional[Dict]:
        """Mapeia linha do CSV para formato da API"""
        
        try:
            # Mapear campos
            caso = {
                "agravo_codigo": row.get("tipo_agravo", "").lower(),
                "numero_notificacao": row["numero_notificacao"],
                "data_notificacao": self._formatar_data(row["data_notificacao"]),
                "data_primeiros_sintomas": self._formatar_data(row.get("data_sintomas")),
                "paciente_nome": row["nome_paciente"],
                "paciente_idade": int(row.get("idade", 0)),
                "paciente_sexo": row.get("sexo", "").upper(),
                "municipio_residencia": row["municipio"],
                "estado_residencia": row.get("uf", "SP"),
                "bairro": row.get("bairro", ""),
                "classificacao_inicial": row.get("classificacao", "suspeito")
            }
            
            # Adicionar sintomas se disponíveis
            sintomas = {}
            for sintoma in ["febre", "cefaleia", "mialgia", "nausea"]:
                if sintoma in row:
                    sintomas[sintoma] = row[sintoma].lower() in ["sim", "s", "true", "1"]
            
            if sintomas:
                caso["sintomas"] = sintomas
            
            return caso
            
        except Exception as e:
            logger.error(f"Erro ao mapear caso {row.get('numero_notificacao', 'DESCONHECIDO')}: {e}")
            return None
    
    def _importar_lote(self, casos: List[Dict]) -> Dict:
        """Importa um lote de casos"""
        
        resultado = {
            "sucesso": 0,
            "erros": [],
            "duplicados": 0
        }
        
        try:
            # Usar endpoint de importação em massa se disponível
            response = self.client.post("/agravos/casos/bulk", {"casos": casos})
            
            resultado["sucesso"] = response.get("importados", 0)
            resultado["duplicados"] = response.get("duplicados", 0)
            resultado["erros"] = response.get("erros", [])
            
        except Exception as e:
            # Se bulk falhar, tentar um por um
            for caso in casos:
                try:
                    self.client.post("/agravos/casos", caso)
                    resultado["sucesso"] += 1
                except Exception as erro:
                    if "duplicado" in str(erro).lower():
                        resultado["duplicados"] += 1
                    else:
                        resultado["erros"].append({
                            "numero_notificacao": caso.get("numero_notificacao"),
                            "erro": str(erro)
                        })
        
        return resultado
    
    def _dividir_em_lotes(self, lista: List, tamanho: int) -> List[List]:
        """Divide lista em lotes menores"""
        for i in range(0, len(lista), tamanho):
            yield lista[i:i + tamanho]
    
    def _formatar_data(self, data_str: str) -> str:
        """Formata data para ISO 8601"""
        from dateutil import parser
        
        try:
            data = parser.parse(data_str)
            return data.isoformat()
        except:
            return datetime.now().isoformat()

# Exemplo de uso
importador = ImportadorDados(client)
resultado = importador.importar_casos_csv("casos_dengue_2025.csv")

# Salvar log de erros
if resultado['erros']:
    with open('erros_importacao.json', 'w') as f:
        json.dump(resultado['erros'], f, indent=2)

Automação e Integração

Bot de Monitoramento

import asyncio
import aiohttp
from typing import List, Dict
import json

class BotMonitoramentoSinapse:
    """Bot assíncrono para monitoramento contínuo"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.base_url = config['api_url']
        self.token = None
        self.session = None
    
    async def iniciar(self):
        """Inicia o bot"""
        self.session = aiohttp.ClientSession()
        await self._autenticar()
        
        # Criar tarefas de monitoramento
        tarefas = [
            self._monitorar_novos_casos(),
            self._verificar_alertas(),
            self._atualizar_dashboard()
        ]
        
        await asyncio.gather(*tarefas)
    
    async def parar(self):
        """Para o bot"""
        if self.session:
            await self.session.close()
    
    async def _autenticar(self):
        """Autentica na API"""
        async with self.session.post(
            f"{self.base_url}/auth/login",
            json={
                "email": self.config['email'],
                "senha": self.config['senha']
            }
        ) as response:
            data = await response.json()
            self.token = data['access_token']
    
    async def _monitorar_novos_casos(self):
        """Monitora novos casos em tempo real"""
        
        ultimo_id = 0
        
        while True:
            try:
                # Buscar novos casos
                headers = {"Authorization": f"Bearer {self.token}"}
                
                async with self.session.get(
                    f"{self.base_url}/agravos/casos",
                    headers=headers,
                    params={
                        "id_maior_que": ultimo_id,
                        "size": 100,
                        "sort": "id",
                        "order": "asc"
                    }
                ) as response:
                    data = await response.json()
                    
                    novos_casos = data.get('items', [])
                    
                    if novos_casos:
                        ultimo_id = novos_casos[-1]['id']
                        
                        # Processar novos casos
                        for caso in novos_casos:
                            await self._processar_novo_caso(caso)
                
                # Aguardar antes da próxima verificação
                await asyncio.sleep(self.config.get('intervalo_checagem', 60))
                
            except Exception as e:
                logger.error(f"Erro no monitoramento: {e}")
                await asyncio.sleep(60)
    
    async def _processar_novo_caso(self, caso: Dict):
        """Processa um novo caso detectado"""
        
        # Verificar critérios de alerta
        if self._caso_requer_alerta(caso):
            await self._enviar_alerta(caso)
        
        # Atualizar estatísticas
        await self._atualizar_estatisticas(caso)
        
        # Log
        logger.info(f"Novo caso processado: {caso['numero_notificacao']} - "
                   f"{caso['agravo_codigo']} em {caso['municipio_residencia']}")
    
    def _caso_requer_alerta(self, caso: Dict) -> bool:
        """Verifica se caso requer alerta imediato"""
        
        # Critérios de alerta
        criterios = self.config.get('criterios_alerta', {})
        
        # Verificar óbito
        if caso.get('evolucao') == 'obito':
            return True
        
        # Verificar municípios prioritários
        if caso['municipio_residencia'] in criterios.get('municipios_prioritarios', []):
            return True
        
        # Verificar agravos prioritários
        if caso['agravo_codigo'] in criterios.get('agravos_prioritarios', []):
            return True
        
        return False
    
    async def _enviar_alerta(self, caso: Dict):
        """Envia alerta sobre caso"""
        
        mensagem = {
            "tipo": "novo_caso_critico",
            "timestamp": datetime.now().isoformat(),
            "caso": {
                "numero": caso['numero_notificacao'],
                "agravo": caso['agravo_codigo'],
                "municipio": caso['municipio_residencia'],
                "evolucao": caso.get('evolucao', 'em_tratamento')
            }
        }
        
        # Enviar para diferentes canais
        for canal in self.config.get('canais_alerta', []):
            if canal['tipo'] == 'webhook':
                await self._enviar_webhook(canal['url'], mensagem)
            elif canal['tipo'] == 'email':
                await self._enviar_email(canal['endereco'], mensagem)

# Configuração e execução
config = {
    "api_url": "https://api.sinapse.org.br/v1",
    "email": "[email protected]",
    "senha": "SenhaForteBot123!",
    "intervalo_checagem": 30,  # segundos
    "criterios_alerta": {
        "municipios_prioritarios": ["São Paulo", "Rio de Janeiro"],
        "agravos_prioritarios": ["dengue_hemorragica", "covid19"]
    },
    "canais_alerta": [
        {
            "tipo": "webhook",
            "url": "https://hooks.slack.com/services/xxx"
        }
    ]
}

# Executar bot
async def main():
    bot = BotMonitoramentoSinapse(config)
    try:
        await bot.iniciar()
    finally:
        await bot.parar()

# asyncio.run(main())

Próximos Passos

On this page