general refactor

now that this has been in use for a few months, generally feeling good
about it. this refactors to use some more concrete types (note using
TypedDict generally because most of this is uncontrolled input from
third-party json). otherwise this should not change anything regarding
functionality, just implementation.
This commit is contained in:
Matthew Ryan Dillon 2025-08-30 11:46:09 -04:00
parent 2a3538ac9e
commit 012a474742

653
main.py
View file

@ -1,230 +1,535 @@
import asyncio
from datetime import datetime, timedelta
from typing import TypedDict
import functools
import logging
import os
import pprint
import zoneinfo
from urllib.parse import urljoin, urlencode
from fastapi import FastAPI, HTTPException
from cachetools import TTLCache
import httpx
pp = pprint.PrettyPrinter()
logger = logging.getLogger("uvicorn.error")
eastern = zoneinfo.ZoneInfo("America/New_York")
app = FastAPI()
weather_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes
pollen_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes
CONFIG = {
# salem, ma
"zip": "01970",
"lat": "42.3554334",
"lon": "-71.060511",
"weather_api_key": os.environ.get("WEATHER_API_KEY", ""),
"auth_token": os.environ["AUTH_TOKEN"],
}
EASTERN_TZ = zoneinfo.ZoneInfo("America/New_York")
CACHE_TTL_SECONDS = 900 # 15 minutes
CACHE_MAX_SIZE = 100
POLLEN_MAX_INDEX = 12.0
HTTP_TIMEOUT = 10.0
DEFAULT_ZIP_CODE = "01970" # salem, ma
DEFAULT_LATITUDE = "42.3554334"
DEFAULT_LONGITUDE = "-71.060511"
def format_date(dt):
dt = dt.astimezone(eastern)
return dt.strftime("%a %d").lower()
def get_required_env(key: str) -> str:
"""Get required environment variable or raise error."""
value = os.environ.get(key)
if not value:
raise ValueError(f"Required environment variable {key} is not set")
return value
def format_time(dt):
dt = dt.astimezone(eastern)
return dt.strftime("%I:%M%p").lower()
class WeatherCondition(TypedDict):
description: str
main: str
id: int
icon: str
def format_datetime(dt):
dt = dt.astimezone(eastern)
return f"{format_date(dt)} {format_time(dt)}"
class CurrentWeather(TypedDict):
dt: int
sunrise: int
sunset: int
temp: float
feels_like: float
pressure: int
humidity: int
weather: list[WeatherCondition]
def relative_day_to_date(rel_dt):
dt = datetime.now()
day = timedelta(days=1)
match rel_dt.lower().strip():
case "yesterday":
return format_date(dt - day)
case "today":
return format_date(dt)
case "tomorrow":
return format_date(dt + day)
case passthrough:
return passthrough
class DailyTemperature(TypedDict):
min: float
max: float
def build_daily_data(date, pollen_periods, weather_periods):
daily_data = {}
if date in pollen_periods:
daily_data["pollen"] = pollen_periods[date]["index"]
if date in weather_periods:
weather_data = weather_periods[date]
daily_data.update({
"low": weather_data["low"],
"high": weather_data["high"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"sunrise": weather_data["sunrise"],
"sunset": weather_data["sunset"],
"pressure": weather_data["pressure"],
})
return daily_data
class DailyWeather(TypedDict):
dt: int
sunrise: int
sunset: int
temp: DailyTemperature
humidity: int
pressure: int
weather: list[WeatherCondition]
def fallback_handler(func):
class WeatherApiResponse(TypedDict):
current: CurrentWeather
daily: list[DailyWeather]
class PollenLocation(TypedDict):
periods: list[dict[str, str | int | float]]
class PollenApiResponse(TypedDict):
ForecastDate: str
Location: PollenLocation
class WeatherPeriod(TypedDict):
low: int
high: int
desc: str
humidity: int
sunrise: str
sunset: str
pressure: int
period: str
class PollenPeriod(TypedDict):
index: int
period: str
class WeatherReport(TypedDict):
forecast_date: str
current_temp: int
current_feels_like: int
current_humidity: int
sunrise: str
sunset: str
current_pressure: int
current_desc: str
periods: list[WeatherPeriod]
class PollenReport(TypedDict):
forecast_date: str
periods: list[PollenPeriod]
class CurrentWeatherData(TypedDict):
temp: int
feels_like: int
desc: str
humidity: int
pressure: int
class DailyData(TypedDict, total=False):
pollen: int
low: int
high: int
desc: str
humidity: int
sunrise: str
sunset: str
pressure: int
class FinalReport(TypedDict):
fetched_at: str
current: CurrentWeatherData
today: DailyData
tomorrow: DailyData
app = FastAPI(title="trmnl weather & pollen report")
weather_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
pollen_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
class Config:
"""Application configuration."""
def __init__(
self,
zip_code: str,
latitude: str,
longitude: str,
weather_api_key: str,
auth_token: str,
):
self.zip_code = zip_code
self.latitude = latitude
self.longitude = longitude
self.weather_api_key = weather_api_key
self.auth_token = auth_token
config = Config(
zip_code=DEFAULT_ZIP_CODE,
latitude=DEFAULT_LATITUDE,
longitude=DEFAULT_LONGITUDE,
weather_api_key=get_required_env("WEATHER_API_KEY"),
auth_token=get_required_env("AUTH_TOKEN"),
)
class DateTimeFormatter:
"""Utility class for datetime formatting."""
@staticmethod
def format_date(dt: datetime) -> str:
"""Format datetime as abbreviated date string (e.g., 'mon 15')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%a %d").lower()
@staticmethod
def format_time(dt: datetime) -> str:
"""Format datetime as time string (e.g., '02:30pm')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%I:%M%p").lower()
@staticmethod
def format_datetime(dt: datetime) -> str:
"""Format datetime as date and time string."""
return f"{DateTimeFormatter.format_date(dt)} {DateTimeFormatter.format_time(dt)}"
@staticmethod
def relative_day_to_date(relative_day: str) -> str:
"""Convert relative day string to formatted date string."""
now = datetime.now()
day_delta = timedelta(days=1)
relative_day = relative_day.lower().strip()
if relative_day == "yesterday":
return DateTimeFormatter.format_date(now - day_delta)
elif relative_day == "today":
return DateTimeFormatter.format_date(now)
elif relative_day == "tomorrow":
return DateTimeFormatter.format_date(now + day_delta)
else:
return relative_day
class WeatherData:
"""Data model for weather information."""
def __init__(self, raw_data: WeatherApiResponse):
self.raw_data = raw_data
self._validate_data()
def _validate_data(self) -> None:
"""Validate required fields in weather data."""
required_fields = ["current", "daily"]
for field in required_fields:
if field not in self.raw_data:
raise ValueError(f"Missing required field in weather data: {field}")
@property
def current(self) -> CurrentWeather:
"""Get current weather data."""
return self.raw_data["current"]
@property
def daily_periods(self) -> list[DailyWeather]:
"""Get daily forecast periods (limited to next 2 days)."""
return self.raw_data["daily"][:2]
class PollenData:
"""Data model for pollen information."""
def __init__(self, raw_data: PollenApiResponse):
self.raw_data = raw_data
self._validate_data()
def _validate_data(self) -> None:
"""Validate required fields in pollen data."""
required_fields = ["ForecastDate", "Location"]
for field in required_fields:
if field not in self.raw_data:
raise ValueError(f"Missing required field in pollen data: {field}")
@property
def forecast_date(self) -> str:
"""Get formatted forecast date."""
dt = datetime.fromisoformat(self.raw_data["ForecastDate"])
return DateTimeFormatter.format_datetime(dt)
@property
def periods(self) -> list[PollenPeriod]:
"""Get pollen periods for today and tomorrow."""
periods = self.raw_data["Location"].get("periods", [])
valid_periods: list[PollenPeriod] = []
for period in periods:
period_type = str(period.get("Type", "")).lower().strip()
if period_type in ["today", "tomorrow"]:
index_value = float(period.get("Index", 0))
# convert pollen index to percentage scale
pollen_percentage = int(index_value / POLLEN_MAX_INDEX * 100)
valid_periods.append(
PollenPeriod(
{
"index": pollen_percentage,
"period": DateTimeFormatter.relative_day_to_date(period_type),
}
)
)
return valid_periods
def error_handler(func):
"""Decorator to handle exceptions in async functions."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
result = await func(*args, **kwargs)
return await func(*args, **kwargs)
except Exception as e:
logger.exception(e)
logger.exception(f"Error in {func.__name__}: {e}")
return []
return result
return wrapper
@fallback_handler
async def fetch_pollen(zipcode):
if zipcode in pollen_cache:
return pollen_cache[zipcode]
class WeatherService:
"""Service for fetching weather data."""
url = f"https://www.pollen.com/api/forecast/current/pollen/{zipcode}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zipcode}",
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
result = [
{
"forecast_date": format_datetime(
datetime.fromisoformat(data["ForecastDate"])
),
"periods": [
{
"index": int(d["Index"] / 12. * 100),
"period": relative_day_to_date(d["Type"]),
}
for d in data["Location"]["periods"]
if d["Type"].lower().strip() in ["today", "tomorrow"]
],
}
@staticmethod
@error_handler
async def fetch_weather(latitude: str, longitude: str, api_key: str) -> list[WeatherReport]:
"""Fetch weather data from OpenWeatherMap API."""
cache_key = (latitude, longitude)
if cache_key in weather_cache:
return weather_cache[cache_key]
base_url = "https://api.openweathermap.org/data/3.0/onecall"
params = {
"lat": latitude,
"lon": longitude,
"appid": api_key,
"units": "imperial",
}
url = f"{base_url}?{urlencode(params)}"
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url)
response.raise_for_status()
raw_data = response.json()
weather_data = WeatherData(raw_data)
result = WeatherService._format_weather_data(weather_data)
weather_cache[cache_key] = result
return result
@staticmethod
def _format_weather_data(weather_data: WeatherData) -> list[WeatherReport]:
"""Format weather data for API response."""
current = weather_data.current
daily_periods = weather_data.daily_periods
current_dt = datetime.fromtimestamp(current["dt"])
return [
WeatherReport(
{
"forecast_date": DateTimeFormatter.format_datetime(current_dt),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": DateTimeFormatter.format_time(
datetime.fromtimestamp(current["sunrise"])
),
"sunset": DateTimeFormatter.format_time(
datetime.fromtimestamp(current["sunset"])
),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
WeatherService._format_daily_period(period) for period in daily_periods
],
}
)
]
pollen_cache[zipcode] = result
return result
@staticmethod
def _format_daily_period(period: DailyWeather) -> WeatherPeriod:
"""Format a single daily weather period."""
period_dt = datetime.fromtimestamp(period["dt"])
@fallback_handler
async def fetch_weather(lat, lon, weather_api_key):
cache_key = (lat, lon)
if cache_key in weather_cache:
return weather_cache[cache_key]
url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&appid={weather_api_key}&units=imperial"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
current, periods = data["current"], data["daily"][:2]
result = [
return WeatherPeriod(
{
"forecast_date": format_datetime(datetime.fromtimestamp(current["dt"])),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": format_time(datetime.fromtimestamp(current["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(current["sunset"])),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
{
"low": int(round(p["temp"]["min"])),
"high": int(round(p["temp"]["max"])),
"desc": p["weather"][0]["description"],
"humidity": p["humidity"],
"sunrise": format_time(datetime.fromtimestamp(p["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(p["sunset"])),
"pressure": p["pressure"],
"period": format_date(datetime.fromtimestamp(p["dt"])),
}
for p in periods
],
"low": int(round(period["temp"]["min"])),
"high": int(round(period["temp"]["max"])),
"desc": period["weather"][0]["description"],
"humidity": period["humidity"],
"sunrise": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunrise"])),
"sunset": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunset"])),
"pressure": period["pressure"],
"period": DateTimeFormatter.format_date(period_dt),
}
]
weather_cache[cache_key] = result
return result
)
class PollenService:
"""Service for fetching pollen data."""
@staticmethod
@error_handler
async def fetch_pollen(zip_code: str) -> list[PollenReport]:
"""Fetch pollen data from pollen.com API."""
if zip_code in pollen_cache:
return pollen_cache[zip_code]
base_url = "https://www.pollen.com/api/forecast/current/pollen/"
url = urljoin(base_url, zip_code)
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zip_code}",
}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
raw_data = response.json()
pollen_data = PollenData(raw_data)
result = [
PollenReport(
{
"forecast_date": pollen_data.forecast_date,
"periods": pollen_data.periods,
}
)
]
pollen_cache[zip_code] = result
return result
class DataAggregator:
"""Service for aggregating weather and pollen data."""
@staticmethod
def build_daily_data(
date: str,
pollen_periods: dict[str, PollenPeriod],
weather_periods: dict[str, WeatherPeriod],
) -> DailyData:
"""Build daily data combining weather and pollen information."""
daily_data: DailyData = {}
if date in pollen_periods:
daily_data["pollen"] = pollen_periods[date]["index"]
if date in weather_periods:
weather_data = weather_periods[date]
daily_data.update(
{
"low": weather_data["low"],
"high": weather_data["high"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"sunrise": weather_data["sunrise"],
"sunset": weather_data["sunset"],
"pressure": weather_data["pressure"],
}
)
return daily_data
@staticmethod
def create_periods_lookup(
pollen_data: list[PollenReport], weather_data: list[WeatherReport]
) -> tuple[dict[str, PollenPeriod], dict[str, WeatherPeriod], CurrentWeatherData]:
"""Create lookup dictionaries for pollen and weather periods."""
pollen_periods: dict[str, PollenPeriod] = {}
weather_periods: dict[str, WeatherPeriod] = {}
if pollen_data and pollen_data[0].get("periods"):
pollen_periods = {p["period"]: p for p in pollen_data[0]["periods"]}
if weather_data and weather_data[0].get("periods"):
weather_periods = {p["period"]: p for p in weather_data[0]["periods"]}
current_weather_info = CurrentWeatherData(
{
"temp": 0,
"feels_like": 0,
"desc": "",
"humidity": 0,
"pressure": 0,
}
)
if weather_data and weather_data[0]:
data = weather_data[0]
current_weather_info = CurrentWeatherData(
{
"temp": data["current_temp"],
"feels_like": data["current_feels_like"],
"desc": data["current_desc"],
"humidity": data["current_humidity"],
"pressure": data["current_pressure"],
}
)
return pollen_periods, weather_periods, current_weather_info
@app.get("/")
async def read_root(token: str):
if token != CONFIG["auth_token"]:
raise HTTPException(status_code=403, detail="unauthorized")
async def get_weather_pollen_report(token: str) -> FinalReport:
"""
Get weather and pollen report.
[
pollen,
weather,
] = await asyncio.gather(
fetch_pollen(CONFIG["zip"]),
fetch_weather(CONFIG["lat"], CONFIG["lon"], CONFIG["weather_api_key"]),
Args:
token: Authentication token
Returns:
Dictionary containing current, today, and tomorrow weather/pollen data
Raises:
HTTPException: If authentication fails
"""
if token != config.auth_token:
raise HTTPException(status_code=403, detail="Unauthorized")
pollen_data, weather_data = await asyncio.gather(
PollenService.fetch_pollen(config.zip_code),
WeatherService.fetch_weather(config.latitude, config.longitude, config.weather_api_key),
)
pollen_periods = {p["period"]: p for p in pollen[0]["periods"]} if pollen and pollen[0]["periods"] else {}
weather_periods = {p["period"]: p for p in weather[0]["periods"]} if weather and weather[0]["periods"] else {}
pollen_periods, weather_periods, current_weather_info = DataAggregator.create_periods_lookup(
pollen_data, weather_data
)
# Add current weather data as a "current" period
if weather and weather[0]:
data = weather[0]
weather_periods["current"] = {
"period": "current",
"temp": data["current_temp"],
"feels_like": data["current_feels_like"],
"humidity": data["current_humidity"],
"pressure": data["current_pressure"],
"desc": data["current_desc"],
"sunrise": data["sunrise"],
"sunset": data["sunset"]
now = datetime.now()
today_date = DateTimeFormatter.format_date(now)
tomorrow_date = DateTimeFormatter.format_date(now + timedelta(days=1))
result: FinalReport = FinalReport(
{
"fetched_at": DateTimeFormatter.format_datetime(now),
"current": current_weather_info,
"today": DailyData({}),
"tomorrow": DailyData({}),
}
)
today_date = format_date(datetime.now())
tomorrow_date = format_date(datetime.now() + timedelta(days=1))
result = {
"fetched_at": format_datetime(datetime.now()),
"current": {},
"today": {},
"tomorrow": {},
}
if "current" in weather_periods:
weather_data = weather_periods["current"]
result["current"] = {
"temp": weather_data["temp"],
"feels_like": weather_data["feels_like"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"pressure": weather_data["pressure"]
}
today_data = build_daily_data(today_date, pollen_periods, weather_periods)
today_data = DataAggregator.build_daily_data(today_date, pollen_periods, weather_periods)
if today_data:
result["today"] = today_data
tomorrow_data = build_daily_data(tomorrow_date, pollen_periods, weather_periods)
tomorrow_data = DataAggregator.build_daily_data(tomorrow_date, pollen_periods, weather_periods)
if tomorrow_data:
result["tomorrow"] = tomorrow_data