WIP: refactor

This commit is contained in:
Matthew Ryan Dillon 2025-08-30 11:46:09 -04:00
parent 2a3538ac9e
commit 87317cb685

603
main.py
View file

@ -1,9 +1,9 @@
import asyncio
from datetime import datetime, timedelta
from typing import TypedDict, Union
import functools
import logging
import os
import pprint
import zoneinfo
from fastapi import FastAPI, HTTPException
@ -11,221 +11,474 @@ from cachetools import TTLCache
import httpx
pp = pprint.PrettyPrinter()
logger = logging.getLogger("uvicorn.error")
eastern = zoneinfo.ZoneInfo("America/New_York")
app = FastAPI()
EASTERN_TZ = zoneinfo.ZoneInfo("America/New_York")
CACHE_TTL_SECONDS = 900 # 15 minutes
CACHE_MAX_SIZE = 100
POLLEN_MAX_INDEX = 12.0
POLLEN_PERCENTAGE_SCALE = 100
HTTP_TIMEOUT = 10.0
weather_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes
pollen_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes
class WeatherCondition(TypedDict):
description: str
main: str
id: int
icon: str
CONFIG = {
# salem, ma
"zip": "01970",
"lat": "42.3554334",
"lon": "-71.060511",
"weather_api_key": os.environ.get("WEATHER_API_KEY", ""),
"auth_token": os.environ["AUTH_TOKEN"],
}
class CurrentWeather(TypedDict):
dt: int
sunrise: int
sunset: int
temp: float
feels_like: float
pressure: int
humidity: int
weather: list[WeatherCondition]
class DailyTemperature(TypedDict):
min: float
max: float
class DailyWeather(TypedDict):
dt: int
sunrise: int
sunset: int
temp: DailyTemperature
humidity: int
pressure: int
weather: list[WeatherCondition]
class WeatherApiResponse(TypedDict):
current: CurrentWeather
daily: list[DailyWeather]
class PollenLocation(TypedDict):
periods: list[dict[str, Union[str, int, float]]]
class PollenApiResponse(TypedDict):
ForecastDate: str
Location: PollenLocation
class WeatherPeriod(TypedDict):
low: int
high: int
desc: str
humidity: int
sunrise: str
sunset: str
pressure: int
period: str
class PollenPeriod(TypedDict):
index: int
period: str
class WeatherReport(TypedDict):
forecast_date: str
current_temp: int
current_feels_like: int
current_humidity: int
sunrise: str
sunset: str
current_pressure: int
current_desc: str
periods: list[WeatherPeriod]
class PollenReport(TypedDict):
forecast_date: str
periods: list[PollenPeriod]
class CurrentWeatherData(TypedDict):
temp: int
feels_like: int
desc: str
humidity: int
pressure: int
class DailyData(TypedDict, total=False):
pollen: int
low: int
high: int
desc: str
humidity: int
sunrise: str
sunset: str
pressure: int
class FinalReport(TypedDict):
fetched_at: str
current: CurrentWeatherData
today: DailyData
tomorrow: DailyData
app = FastAPI(title="TRMNL Weather & Pollen Report")
weather_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
pollen_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
def format_date(dt):
dt = dt.astimezone(eastern)
return dt.strftime("%a %d").lower()
class Config:
"""Application configuration."""
def __init__(self):
self.zip_code = "01970" # Salem, MA
self.latitude = "42.3554334"
self.longitude = "-71.060511"
self.weather_api_key = self._get_required_env("WEATHER_API_KEY")
self.auth_token = self._get_required_env("AUTH_TOKEN")
@staticmethod
def _get_required_env(key: str) -> str:
"""Get required environment variable or raise error."""
value = os.environ.get(key)
if not value:
raise ValueError(f"Required environment variable {key} is not set")
return value
def format_time(dt):
dt = dt.astimezone(eastern)
return dt.strftime("%I:%M%p").lower()
config = Config()
def format_datetime(dt):
dt = dt.astimezone(eastern)
return f"{format_date(dt)} {format_time(dt)}"
class DateTimeFormatter:
"""Utility class for datetime formatting."""
@staticmethod
def format_date(dt: datetime) -> str:
"""Format datetime as abbreviated date string (e.g., 'mon 15')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%a %d").lower()
@staticmethod
def format_time(dt: datetime) -> str:
"""Format datetime as time string (e.g., '02:30pm')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%I:%M%p").lower()
@staticmethod
def format_datetime(dt: datetime) -> str:
"""Format datetime as date and time string."""
return f"{DateTimeFormatter.format_date(dt)} {DateTimeFormatter.format_time(dt)}"
@staticmethod
def relative_day_to_date(relative_day: str) -> str:
"""Convert relative day string to formatted date string."""
now = datetime.now()
day_delta = timedelta(days=1)
relative_day = relative_day.lower().strip()
if relative_day == "yesterday":
return DateTimeFormatter.format_date(now - day_delta)
elif relative_day == "today":
return DateTimeFormatter.format_date(now)
elif relative_day == "tomorrow":
return DateTimeFormatter.format_date(now + day_delta)
else:
return relative_day
def relative_day_to_date(rel_dt):
dt = datetime.now()
day = timedelta(days=1)
match rel_dt.lower().strip():
case "yesterday":
return format_date(dt - day)
case "today":
return format_date(dt)
case "tomorrow":
return format_date(dt + day)
case passthrough:
return passthrough
class WeatherData:
"""Data model for weather information."""
def __init__(self, raw_data: WeatherApiResponse):
self.raw_data = raw_data
self._validate_data()
def _validate_data(self) -> None:
"""Validate required fields in weather data."""
required_fields = ["current", "daily"]
for field in required_fields:
if field not in self.raw_data:
raise ValueError(f"Missing required field in weather data: {field}")
@property
def current(self) -> CurrentWeather:
"""Get current weather data."""
return self.raw_data["current"]
@property
def daily_periods(self) -> list[DailyWeather]:
"""Get daily forecast periods (limited to next 2 days)."""
return self.raw_data["daily"][:2]
def build_daily_data(date, pollen_periods, weather_periods):
daily_data = {}
if date in pollen_periods:
daily_data["pollen"] = pollen_periods[date]["index"]
if date in weather_periods:
weather_data = weather_periods[date]
daily_data.update({
"low": weather_data["low"],
"high": weather_data["high"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"sunrise": weather_data["sunrise"],
"sunset": weather_data["sunset"],
"pressure": weather_data["pressure"],
})
return daily_data
class PollenData:
"""Data model for pollen information."""
def __init__(self, raw_data: PollenApiResponse):
self.raw_data = raw_data
self._validate_data()
def _validate_data(self) -> None:
"""Validate required fields in pollen data."""
required_fields = ["ForecastDate", "Location"]
for field in required_fields:
if field not in self.raw_data:
raise ValueError(f"Missing required field in pollen data: {field}")
@property
def forecast_date(self) -> str:
"""Get formatted forecast date."""
dt = datetime.fromisoformat(self.raw_data["ForecastDate"])
return DateTimeFormatter.format_datetime(dt)
@property
def periods(self) -> list[PollenPeriod]:
"""Get pollen periods for today and tomorrow."""
periods = self.raw_data["Location"].get("periods", [])
valid_periods: list[PollenPeriod] = []
for period in periods:
period_type = str(period.get("Type", "")).lower().strip()
if period_type in ["today", "tomorrow"]:
index_value = float(period.get("Index", 0))
# Convert pollen index to percentage scale (0-100)
pollen_percentage = int(index_value / POLLEN_MAX_INDEX * POLLEN_PERCENTAGE_SCALE)
valid_periods.append(PollenPeriod({
"index": pollen_percentage,
"period": DateTimeFormatter.relative_day_to_date(period_type),
}))
return valid_periods
def fallback_handler(func):
def error_handler(func):
"""Decorator to handle exceptions in async functions."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
result = await func(*args, **kwargs)
return await func(*args, **kwargs)
except Exception as e:
logger.exception(e)
logger.exception(f"Error in {func.__name__}: {e}")
return []
return result
return wrapper
@fallback_handler
async def fetch_pollen(zipcode):
if zipcode in pollen_cache:
return pollen_cache[zipcode]
class WeatherService:
"""Service for fetching weather data."""
url = f"https://www.pollen.com/api/forecast/current/pollen/{zipcode}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zipcode}",
}
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
data = response.json()
result = [
{
"forecast_date": format_datetime(
datetime.fromisoformat(data["ForecastDate"])
),
"periods": [
{
"index": int(d["Index"] / 12. * 100),
"period": relative_day_to_date(d["Type"]),
}
for d in data["Location"]["periods"]
if d["Type"].lower().strip() in ["today", "tomorrow"]
],
@staticmethod
@error_handler
async def fetch_weather(latitude: str, longitude: str, api_key: str) -> list[WeatherReport]:
"""Fetch weather data from OpenWeatherMap API."""
cache_key = (latitude, longitude)
if cache_key in weather_cache:
return weather_cache[cache_key]
url = (
f"https://api.openweathermap.org/data/3.0/onecall"
f"?lat={latitude}&lon={longitude}&appid={api_key}&units=imperial"
)
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url)
response.raise_for_status()
raw_data = response.json()
weather_data = WeatherData(raw_data)
result = WeatherService._format_weather_data(weather_data)
weather_cache[cache_key] = result
return result
@staticmethod
def _format_weather_data(weather_data: WeatherData) -> list[WeatherReport]:
"""Format weather data for API response."""
current = weather_data.current
daily_periods = weather_data.daily_periods
current_dt = datetime.fromtimestamp(current["dt"])
return [WeatherReport({
"forecast_date": DateTimeFormatter.format_datetime(current_dt),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": DateTimeFormatter.format_time(datetime.fromtimestamp(current["sunrise"])),
"sunset": DateTimeFormatter.format_time(datetime.fromtimestamp(current["sunset"])),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
WeatherService._format_daily_period(period)
for period in daily_periods
],
})]
@staticmethod
def _format_daily_period(period: DailyWeather) -> WeatherPeriod:
"""Format a single daily weather period."""
period_dt = datetime.fromtimestamp(period["dt"])
return WeatherPeriod({
"low": int(round(period["temp"]["min"])),
"high": int(round(period["temp"]["max"])),
"desc": period["weather"][0]["description"],
"humidity": period["humidity"],
"sunrise": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunrise"])),
"sunset": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunset"])),
"pressure": period["pressure"],
"period": DateTimeFormatter.format_date(period_dt),
})
class PollenService:
"""Service for fetching pollen data."""
@staticmethod
@error_handler
async def fetch_pollen(zip_code: str) -> list[PollenReport]:
"""Fetch pollen data from pollen.com API."""
if zip_code in pollen_cache:
return pollen_cache[zip_code]
url = f"https://www.pollen.com/api/forecast/current/pollen/{zip_code}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zip_code}",
}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
raw_data = response.json()
pollen_data = PollenData(raw_data)
result = [PollenReport({
"forecast_date": pollen_data.forecast_date,
"periods": pollen_data.periods,
})]
pollen_cache[zip_code] = result
return result
class DataAggregator:
"""Service for aggregating weather and pollen data."""
@staticmethod
def build_daily_data(
date: str,
pollen_periods: dict[str, PollenPeriod],
weather_periods: dict[str, WeatherPeriod]
) -> DailyData:
"""Build daily data combining weather and pollen information."""
daily_data: DailyData = {}
if date in pollen_periods:
daily_data["pollen"] = pollen_periods[date]["index"]
if date in weather_periods:
weather_data = weather_periods[date]
daily_data.update({
"low": weather_data["low"],
"high": weather_data["high"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"sunrise": weather_data["sunrise"],
"sunset": weather_data["sunset"],
"pressure": weather_data["pressure"],
})
return daily_data
@staticmethod
def create_periods_lookup(
pollen_data: list[PollenReport],
weather_data: list[WeatherReport]
) -> tuple[dict[str, PollenPeriod], dict[str, WeatherPeriod], CurrentWeatherData]:
"""Create lookup dictionaries for pollen and weather periods."""
pollen_periods: dict[str, PollenPeriod] = {}
weather_periods: dict[str, WeatherPeriod] = {}
if pollen_data and pollen_data[0].get("periods"):
pollen_periods = {
p["period"]: p for p in pollen_data[0]["periods"]
}
]
pollen_cache[zipcode] = result
return result
@fallback_handler
async def fetch_weather(lat, lon, weather_api_key):
cache_key = (lat, lon)
if cache_key in weather_cache:
return weather_cache[cache_key]
url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&appid={weather_api_key}&units=imperial"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
current, periods = data["current"], data["daily"][:2]
result = [
{
"forecast_date": format_datetime(datetime.fromtimestamp(current["dt"])),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": format_time(datetime.fromtimestamp(current["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(current["sunset"])),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
{
"low": int(round(p["temp"]["min"])),
"high": int(round(p["temp"]["max"])),
"desc": p["weather"][0]["description"],
"humidity": p["humidity"],
"sunrise": format_time(datetime.fromtimestamp(p["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(p["sunset"])),
"pressure": p["pressure"],
"period": format_date(datetime.fromtimestamp(p["dt"])),
}
for p in periods
],
if weather_data and weather_data[0].get("periods"):
weather_periods = {
p["period"]: p for p in weather_data[0]["periods"]
}
]
weather_cache[cache_key] = result
return result
current_weather_info = CurrentWeatherData({
"temp": 0,
"feels_like": 0,
"desc": "",
"humidity": 0,
"pressure": 0,
})
if weather_data and weather_data[0]:
data = weather_data[0]
current_weather_info = CurrentWeatherData({
"temp": data["current_temp"],
"feels_like": data["current_feels_like"],
"desc": data["current_desc"],
"humidity": data["current_humidity"],
"pressure": data["current_pressure"],
})
return pollen_periods, weather_periods, current_weather_info
@app.get("/")
async def read_root(token: str):
if token != CONFIG["auth_token"]:
raise HTTPException(status_code=403, detail="unauthorized")
async def get_weather_pollen_report(token: str) -> FinalReport:
"""
Get weather and pollen report.
[
pollen,
weather,
] = await asyncio.gather(
fetch_pollen(CONFIG["zip"]),
fetch_weather(CONFIG["lat"], CONFIG["lon"], CONFIG["weather_api_key"]),
Args:
token: Authentication token
Returns:
Dictionary containing current, today, and tomorrow weather/pollen data
Raises:
HTTPException: If authentication fails
"""
if token != config.auth_token:
raise HTTPException(status_code=403, detail="Unauthorized")
pollen_data, weather_data = await asyncio.gather(
PollenService.fetch_pollen(config.zip_code),
WeatherService.fetch_weather(config.latitude, config.longitude, config.weather_api_key),
)
pollen_periods = {p["period"]: p for p in pollen[0]["periods"]} if pollen and pollen[0]["periods"] else {}
weather_periods = {p["period"]: p for p in weather[0]["periods"]} if weather and weather[0]["periods"] else {}
pollen_periods, weather_periods, current_weather_info = DataAggregator.create_periods_lookup(
pollen_data, weather_data
)
# Add current weather data as a "current" period
if weather and weather[0]:
data = weather[0]
weather_periods["current"] = {
"period": "current",
"temp": data["current_temp"],
"feels_like": data["current_feels_like"],
"humidity": data["current_humidity"],
"pressure": data["current_pressure"],
"desc": data["current_desc"],
"sunrise": data["sunrise"],
"sunset": data["sunset"]
}
now = datetime.now()
today_date = DateTimeFormatter.format_date(now)
tomorrow_date = DateTimeFormatter.format_date(now + timedelta(days=1))
today_date = format_date(datetime.now())
tomorrow_date = format_date(datetime.now() + timedelta(days=1))
result: FinalReport = FinalReport({
"fetched_at": DateTimeFormatter.format_datetime(now),
"current": current_weather_info,
"today": DailyData({}),
"tomorrow": DailyData({}),
})
result = {
"fetched_at": format_datetime(datetime.now()),
"current": {},
"today": {},
"tomorrow": {},
}
if "current" in weather_periods:
weather_data = weather_periods["current"]
result["current"] = {
"temp": weather_data["temp"],
"feels_like": weather_data["feels_like"],
"desc": weather_data["desc"],
"humidity": weather_data["humidity"],
"pressure": weather_data["pressure"]
}
today_data = build_daily_data(today_date, pollen_periods, weather_periods)
today_data = DataAggregator.build_daily_data(today_date, pollen_periods, weather_periods)
if today_data:
result["today"] = today_data
tomorrow_data = build_daily_data(tomorrow_date, pollen_periods, weather_periods)
tomorrow_data = DataAggregator.build_daily_data(tomorrow_date, pollen_periods, weather_periods)
if tomorrow_data:
result["tomorrow"] = tomorrow_data
return result
@app.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}