Exemplos em Python
Implementações práticas e prontas para usar com Python
Esta página contém exemplos práticos de como integrar com a API Sinapse usando Python.
Instalação
pip install requests python-dotenvCliente Base
Primeiro, vamos criar uma classe base para facilitar todas as operações:
import requests
from typing import Dict, List, Optional, Any
import os
from dotenv import load_dotenv
load_dotenv()
class SinapseClient:
"""Cliente para interagir com a API Sinapse"""
def __init__(self, base_url: str = None, token: str = None):
self.base_url = base_url or os.getenv('SINAPSE_API_URL', 'https://api.sinapse.org.br/v1')
self.token = token
self.session = requests.Session()
if self.token:
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
def login(self, email: str, senha: str) -> Dict[str, Any]:
"""Realiza login e armazena o token"""
response = self.session.post(
f"{self.base_url}/auth/login",
json={"email": email, "senha": senha}
)
if response.status_code == 200:
data = response.json()
self.token = data['access_token']
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
return data
else:
raise Exception(f"Erro no login: {response.status_code} - {response.text}")
def get(self, endpoint: str, params: Dict = None) -> Any:
"""Realiza requisição GET"""
response = self.session.get(f"{self.base_url}{endpoint}", params=params)
response.raise_for_status()
return response.json()
def post(self, endpoint: str, data: Dict = None) -> Any:
"""Realiza requisição POST"""
response = self.session.post(f"{self.base_url}{endpoint}", json=data)
response.raise_for_status()
return response.json()
def put(self, endpoint: str, data: Dict = None) -> Any:
"""Realiza requisição PUT"""
response = self.session.put(f"{self.base_url}{endpoint}", json=data)
response.raise_for_status()
return response.json()
def delete(self, endpoint: str) -> Any:
"""Realiza requisição DELETE"""
response = self.session.delete(f"{self.base_url}{endpoint}")
response.raise_for_status()
return response.json() if response.content else NoneAutenticação e Gestão de Sessão
Login com Renovação Automática
import time
from datetime import datetime, timedelta
import jwt
class SinapseAuthClient(SinapseClient):
"""Cliente com gerenciamento automático de tokens"""
def __init__(self, email: str, senha: str):
super().__init__()
self.email = email
self.senha = senha
self.refresh_token = None
self.token_expires_at = None
self.login(email, senha)
def login(self, email: str, senha: str) -> Dict[str, Any]:
"""Login com armazenamento de refresh token"""
data = super().login(email, senha)
self.refresh_token = data['refresh_token']
# Decodificar token para pegar expiração
decoded = jwt.decode(self.token, options={"verify_signature": False})
self.token_expires_at = datetime.fromtimestamp(decoded['exp'])
return data
def _check_token(self):
"""Verifica e renova token se necessário"""
if datetime.now() >= self.token_expires_at - timedelta(minutes=5):
self._refresh_token()
def _refresh_token(self):
"""Renova o token de acesso"""
response = self.session.post(
f"{self.base_url}/auth/refresh",
json={"refresh_token": self.refresh_token}
)
if response.status_code == 200:
data = response.json()
self.token = data['access_token']
self.session.headers.update({
'Authorization': f'Bearer {self.token}'
})
decoded = jwt.decode(self.token, options={"verify_signature": False})
self.token_expires_at = datetime.fromtimestamp(decoded['exp'])
else:
# Se refresh falhar, fazer login novamente
self.login(self.email, self.senha)
def get(self, endpoint: str, params: Dict = None) -> Any:
"""GET com verificação de token"""
self._check_token()
return super().get(endpoint, params)
def post(self, endpoint: str, data: Dict = None) -> Any:
"""POST com verificação de token"""
self._check_token()
return super().post(endpoint, data)
# Uso
client = SinapseAuthClient("[email protected]", "SenhaForte123!")
perfil = client.get("/usuarios/me")
print(f"Logado como: {perfil['nome']}")Gestão de Usuários
CRUD Completo de Usuários
class GestorUsuarios:
"""Gerenciador de usuários Sinapse"""
def __init__(self, client: SinapseClient):
self.client = client
def criar_usuario(self, dados_usuario: Dict) -> Dict:
"""Cria novo usuário"""
# Validar dados obrigatórios
campos_obrigatorios = ['email', 'senha', 'nome', 'cpf']
for campo in campos_obrigatorios:
if campo not in dados_usuario:
raise ValueError(f"Campo obrigatório ausente: {campo}")
# Validar CPF
if not self._validar_cpf(dados_usuario['cpf']):
raise ValueError("CPF inválido")
# Validar senha
if not self._validar_senha(dados_usuario['senha']):
raise ValueError("Senha não atende aos requisitos mínimos")
return self.client.post("/usuarios", dados_usuario)
def buscar_usuarios(self, filtros: Dict = None) -> List[Dict]:
"""Busca usuários com filtros"""
params = filtros or {}
params.setdefault('page', 1)
params.setdefault('size', 50)
todos_usuarios = []
while True:
response = self.client.get("/usuarios", params)
todos_usuarios.extend(response['items'])
if params['page'] >= response['pages']:
break
params['page'] += 1
return todos_usuarios
def atualizar_usuario(self, user_id: int, dados: Dict) -> Dict:
"""Atualiza dados do usuário"""
return self.client.put(f"/usuarios/{user_id}", dados)
def desativar_usuario(self, user_id: int, motivo: str) -> bool:
"""Desativa um usuário"""
return self.client.delete(f"/usuarios/{user_id}?motivo={motivo}")
def atribuir_grupo(self, user_id: int, grupo_id: int) -> bool:
"""Adiciona usuário a um grupo"""
return self.client.post(f"/usuarios/{user_id}/grupos/{grupo_id}")
def _validar_cpf(self, cpf: str) -> bool:
"""Valida CPF brasileiro"""
cpf = ''.join(filter(str.isdigit, cpf))
if len(cpf) != 11:
return False
# Validação do dígito verificador
for i in range(9, 11):
value = sum((int(cpf[num]) * ((i+1) - num) for num in range(0, i)))
digit = ((value * 10) % 11) % 10
if digit != int(cpf[i]):
return False
return True
def _validar_senha(self, senha: str) -> bool:
"""Valida requisitos de senha"""
import re
if len(senha) < 8:
return False
if not re.search(r"[A-Z]", senha):
return False
if not re.search(r"[a-z]", senha):
return False
if not re.search(r"[0-9]", senha):
return False
if not re.search(r"[^A-Za-z0-9]", senha):
return False
return True
# Exemplo de uso
gestor = GestorUsuarios(client)
# Criar usuário
novo_usuario = gestor.criar_usuario({
"email": "[email protected]",
"senha": "SenhaForte123!",
"nome": "Novo Usuário",
"cpf": "123.456.789-00",
"telefone": "(11) 98765-4321",
"cargo": "Analista"
})
# Buscar usuários ativos em SP
usuarios_sp = gestor.buscar_usuarios({
"estado": "SP",
"ativo": True,
"size": 100
})
print(f"Encontrados {len(usuarios_sp)} usuários ativos em SP")Análise de Agravos
Dashboard de Monitoramento
import pandas as pd
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
class MonitorAgravos:
"""Monitor de agravos epidemiológicos"""
def __init__(self, client: SinapseClient):
self.client = client
def analisar_surto(self,
tipo_agravo: str,
municipio: str,
dias: int = 30) -> Dict:
"""Analisa possível surto em município"""
# Buscar casos
data_fim = datetime.now()
data_inicio = data_fim - timedelta(days=dias)
casos = self.client.get("/agravos/casos", {
"agravo_codigo": tipo_agravo,
"municipio_residencia": municipio,
"data_inicio": data_inicio.isoformat(),
"data_fim": data_fim.isoformat(),
"size": 1000
})
# Converter para DataFrame
df = pd.DataFrame(casos['items'])
if df.empty:
return {"alerta": False, "mensagem": "Sem casos no período"}
# Análise temporal
df['data_notificacao'] = pd.to_datetime(df['data_notificacao'])
df['semana'] = df['data_notificacao'].dt.isocalendar().week
# Casos por semana
casos_semanais = df.groupby('semana').size()
# Calcular tendência
if len(casos_semanais) >= 4:
# Média móvel
media_movel = casos_semanais.rolling(window=2).mean()
# Taxa de crescimento
crescimento = (casos_semanais.iloc[-1] / casos_semanais.iloc[-2] - 1) * 100
# Critérios de alerta
alerta = (
crescimento > 50 or # Crescimento > 50%
casos_semanais.iloc[-1] > casos_semanais.mean() * 2 # Acima de 2x a média
)
else:
alerta = False
crescimento = 0
# Análise espacial
bairros_afetados = df['bairro'].value_counts()
return {
"alerta": alerta,
"total_casos": len(df),
"casos_ultima_semana": int(casos_semanais.iloc[-1]) if not casos_semanais.empty else 0,
"crescimento_percentual": round(crescimento, 1),
"bairros_mais_afetados": bairros_afetados.head(5).to_dict(),
"classificacao_final": df['classificacao_final'].value_counts().to_dict(),
"serie_temporal": casos_semanais.to_dict()
}
def gerar_relatorio_estado(self, estado: str, tipo_agravo: str = "dengue") -> pd.DataFrame:
"""Gera relatório consolidado por estado"""
# Buscar estatísticas
stats = self.client.get("/agravos/casos/estatisticas", {
"estado": estado,
"agravo_codigo": tipo_agravo,
"agregacao": "municipio",
"periodo": "mes_atual"
})
# Criar DataFrame
df_municipios = pd.DataFrame(stats['por_municipio'])
# Calcular métricas
df_municipios['casos_por_100k'] = (
df_municipios['total_casos'] / df_municipios['populacao'] * 100000
)
df_municipios['risco'] = pd.cut(
df_municipios['casos_por_100k'],
bins=[0, 50, 100, 300, float('inf')],
labels=['Baixo', 'Médio', 'Alto', 'Muito Alto']
)
# Ordenar por risco
df_municipios = df_municipios.sort_values('casos_por_100k', ascending=False)
return df_municipios
def alertas_automaticos(self, configuracoes: List[Dict]) -> List[Dict]:
"""Verifica alertas configurados"""
alertas_disparados = []
for config in configuracoes:
resultado = self.analisar_surto(
tipo_agravo=config['tipo_agravo'],
municipio=config['municipio'],
dias=config.get('dias', 30)
)
if resultado['alerta']:
alerta = {
"timestamp": datetime.now().isoformat(),
"municipio": config['municipio'],
"tipo_agravo": config['tipo_agravo'],
"mensagem": f"Possível surto detectado: {resultado['crescimento_percentual']}% de crescimento",
"dados": resultado
}
alertas_disparados.append(alerta)
# Enviar notificação (webhook, email, etc)
if 'webhook_url' in config:
self._enviar_webhook(config['webhook_url'], alerta)
return alertas_disparados
def _enviar_webhook(self, url: str, dados: Dict):
"""Envia alerta via webhook"""
try:
response = requests.post(url, json=dados, timeout=10)
response.raise_for_status()
except Exception as e:
print(f"Erro ao enviar webhook: {e}")
# Exemplo de uso
monitor = MonitorAgravos(client)
# Analisar surto
analise = monitor.analisar_surto("dengue", "São Paulo", dias=30)
if analise['alerta']:
print(f"ALERTA: Crescimento de {analise['crescimento_percentual']}%")
print(f"Total de casos: {analise['total_casos']}")
print(f"Bairros mais afetados: {list(analise['bairros_mais_afetados'].keys())[:3]}")
# Relatório estadual
df_relatorio = monitor.gerar_relatorio_estado("SP", "dengue")
print("\nMunicípios de maior risco:")
print(df_relatorio[['municipio', 'total_casos', 'casos_por_100k', 'risco']].head(10))Sincronização de Dados
Importação em Lote
import csv
from concurrent.futures import ThreadPoolExecutor
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ImportadorDados:
"""Importador de dados em lote para Sinapse"""
def __init__(self, client: SinapseClient, max_workers: int = 5):
self.client = client
self.max_workers = max_workers
def importar_casos_csv(self, arquivo_csv: str) -> Dict:
"""Importa casos de agravo de arquivo CSV"""
casos_para_importar = []
# Ler CSV
with open(arquivo_csv, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
caso = self._mapear_caso_csv(row)
if caso:
casos_para_importar.append(caso)
logger.info(f"Lidos {len(casos_para_importar)} casos do CSV")
# Importar em paralelo
resultados = {
"sucesso": 0,
"erros": [],
"duplicados": 0
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = []
for lote in self._dividir_em_lotes(casos_para_importar, 100):
future = executor.submit(self._importar_lote, lote)
futures.append(future)
for future in futures:
resultado_lote = future.result()
resultados["sucesso"] += resultado_lote["sucesso"]
resultados["erros"].extend(resultado_lote["erros"])
resultados["duplicados"] += resultado_lote["duplicados"]
logger.info(f"Importação concluída: {resultados['sucesso']} sucessos, "
f"{len(resultados['erros'])} erros, {resultados['duplicados']} duplicados")
return resultados
def _mapear_caso_csv(self, row: Dict) -> Optional[Dict]:
"""Mapeia linha do CSV para formato da API"""
try:
# Mapear campos
caso = {
"agravo_codigo": row.get("tipo_agravo", "").lower(),
"numero_notificacao": row["numero_notificacao"],
"data_notificacao": self._formatar_data(row["data_notificacao"]),
"data_primeiros_sintomas": self._formatar_data(row.get("data_sintomas")),
"paciente_nome": row["nome_paciente"],
"paciente_idade": int(row.get("idade", 0)),
"paciente_sexo": row.get("sexo", "").upper(),
"municipio_residencia": row["municipio"],
"estado_residencia": row.get("uf", "SP"),
"bairro": row.get("bairro", ""),
"classificacao_inicial": row.get("classificacao", "suspeito")
}
# Adicionar sintomas se disponíveis
sintomas = {}
for sintoma in ["febre", "cefaleia", "mialgia", "nausea"]:
if sintoma in row:
sintomas[sintoma] = row[sintoma].lower() in ["sim", "s", "true", "1"]
if sintomas:
caso["sintomas"] = sintomas
return caso
except Exception as e:
logger.error(f"Erro ao mapear caso {row.get('numero_notificacao', 'DESCONHECIDO')}: {e}")
return None
def _importar_lote(self, casos: List[Dict]) -> Dict:
"""Importa um lote de casos"""
resultado = {
"sucesso": 0,
"erros": [],
"duplicados": 0
}
try:
# Usar endpoint de importação em massa se disponível
response = self.client.post("/agravos/casos/bulk", {"casos": casos})
resultado["sucesso"] = response.get("importados", 0)
resultado["duplicados"] = response.get("duplicados", 0)
resultado["erros"] = response.get("erros", [])
except Exception as e:
# Se bulk falhar, tentar um por um
for caso in casos:
try:
self.client.post("/agravos/casos", caso)
resultado["sucesso"] += 1
except Exception as erro:
if "duplicado" in str(erro).lower():
resultado["duplicados"] += 1
else:
resultado["erros"].append({
"numero_notificacao": caso.get("numero_notificacao"),
"erro": str(erro)
})
return resultado
def _dividir_em_lotes(self, lista: List, tamanho: int) -> List[List]:
"""Divide lista em lotes menores"""
for i in range(0, len(lista), tamanho):
yield lista[i:i + tamanho]
def _formatar_data(self, data_str: str) -> str:
"""Formata data para ISO 8601"""
from dateutil import parser
try:
data = parser.parse(data_str)
return data.isoformat()
except:
return datetime.now().isoformat()
# Exemplo de uso
importador = ImportadorDados(client)
resultado = importador.importar_casos_csv("casos_dengue_2025.csv")
# Salvar log de erros
if resultado['erros']:
with open('erros_importacao.json', 'w') as f:
json.dump(resultado['erros'], f, indent=2)Automação e Integração
Bot de Monitoramento
import asyncio
import aiohttp
from typing import List, Dict
import json
class BotMonitoramentoSinapse:
"""Bot assíncrono para monitoramento contínuo"""
def __init__(self, config: Dict):
self.config = config
self.base_url = config['api_url']
self.token = None
self.session = None
async def iniciar(self):
"""Inicia o bot"""
self.session = aiohttp.ClientSession()
await self._autenticar()
# Criar tarefas de monitoramento
tarefas = [
self._monitorar_novos_casos(),
self._verificar_alertas(),
self._atualizar_dashboard()
]
await asyncio.gather(*tarefas)
async def parar(self):
"""Para o bot"""
if self.session:
await self.session.close()
async def _autenticar(self):
"""Autentica na API"""
async with self.session.post(
f"{self.base_url}/auth/login",
json={
"email": self.config['email'],
"senha": self.config['senha']
}
) as response:
data = await response.json()
self.token = data['access_token']
async def _monitorar_novos_casos(self):
"""Monitora novos casos em tempo real"""
ultimo_id = 0
while True:
try:
# Buscar novos casos
headers = {"Authorization": f"Bearer {self.token}"}
async with self.session.get(
f"{self.base_url}/agravos/casos",
headers=headers,
params={
"id_maior_que": ultimo_id,
"size": 100,
"sort": "id",
"order": "asc"
}
) as response:
data = await response.json()
novos_casos = data.get('items', [])
if novos_casos:
ultimo_id = novos_casos[-1]['id']
# Processar novos casos
for caso in novos_casos:
await self._processar_novo_caso(caso)
# Aguardar antes da próxima verificação
await asyncio.sleep(self.config.get('intervalo_checagem', 60))
except Exception as e:
logger.error(f"Erro no monitoramento: {e}")
await asyncio.sleep(60)
async def _processar_novo_caso(self, caso: Dict):
"""Processa um novo caso detectado"""
# Verificar critérios de alerta
if self._caso_requer_alerta(caso):
await self._enviar_alerta(caso)
# Atualizar estatísticas
await self._atualizar_estatisticas(caso)
# Log
logger.info(f"Novo caso processado: {caso['numero_notificacao']} - "
f"{caso['agravo_codigo']} em {caso['municipio_residencia']}")
def _caso_requer_alerta(self, caso: Dict) -> bool:
"""Verifica se caso requer alerta imediato"""
# Critérios de alerta
criterios = self.config.get('criterios_alerta', {})
# Verificar óbito
if caso.get('evolucao') == 'obito':
return True
# Verificar municípios prioritários
if caso['municipio_residencia'] in criterios.get('municipios_prioritarios', []):
return True
# Verificar agravos prioritários
if caso['agravo_codigo'] in criterios.get('agravos_prioritarios', []):
return True
return False
async def _enviar_alerta(self, caso: Dict):
"""Envia alerta sobre caso"""
mensagem = {
"tipo": "novo_caso_critico",
"timestamp": datetime.now().isoformat(),
"caso": {
"numero": caso['numero_notificacao'],
"agravo": caso['agravo_codigo'],
"municipio": caso['municipio_residencia'],
"evolucao": caso.get('evolucao', 'em_tratamento')
}
}
# Enviar para diferentes canais
for canal in self.config.get('canais_alerta', []):
if canal['tipo'] == 'webhook':
await self._enviar_webhook(canal['url'], mensagem)
elif canal['tipo'] == 'email':
await self._enviar_email(canal['endereco'], mensagem)
# Configuração e execução
config = {
"api_url": "https://api.sinapse.org.br/v1",
"email": "[email protected]",
"senha": "SenhaForteBot123!",
"intervalo_checagem": 30, # segundos
"criterios_alerta": {
"municipios_prioritarios": ["São Paulo", "Rio de Janeiro"],
"agravos_prioritarios": ["dengue_hemorragica", "covid19"]
},
"canais_alerta": [
{
"tipo": "webhook",
"url": "https://hooks.slack.com/services/xxx"
}
]
}
# Executar bot
async def main():
bot = BotMonitoramentoSinapse(config)
try:
await bot.iniciar()
finally:
await bot.parar()
# asyncio.run(main())