wip refactor

This commit is contained in:
Matthew Ryan Dillon 2025-08-30 11:46:09 -04:00
parent 2a3538ac9e
commit 44885152b5

494
main.py
View file

@ -1,9 +1,9 @@
import asyncio import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
import functools import functools
import logging import logging
import os import os
import pprint
import zoneinfo import zoneinfo
from fastapi import FastAPI, HTTPException from fastapi import FastAPI, HTTPException
@ -11,58 +11,284 @@ from cachetools import TTLCache
import httpx import httpx
pp = pprint.PrettyPrinter() # Configure logging
logger = logging.getLogger("uvicorn.error") logger = logging.getLogger("uvicorn.error")
eastern = zoneinfo.ZoneInfo("America/New_York")
app = FastAPI() # Constants
EASTERN_TZ = zoneinfo.ZoneInfo("America/New_York")
CACHE_TTL_SECONDS = 900 # 15 minutes
CACHE_MAX_SIZE = 100
POLLEN_MAX_INDEX = 12.0
POLLEN_PERCENTAGE_SCALE = 100
HTTP_TIMEOUT = 10.0
weather_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes # Initialize FastAPI app
pollen_cache = TTLCache(maxsize=100, ttl=900) # 15 minutes app = FastAPI(title="TRMNL Weather & Pollen Report")
CONFIG = { # Cache instances
# salem, ma weather_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
"zip": "01970", pollen_cache: TTLCache = TTLCache(maxsize=CACHE_MAX_SIZE, ttl=CACHE_TTL_SECONDS)
"lat": "42.3554334",
"lon": "-71.060511",
"weather_api_key": os.environ.get("WEATHER_API_KEY", ""),
"auth_token": os.environ["AUTH_TOKEN"],
}
def format_date(dt): class Config:
dt = dt.astimezone(eastern) """Application configuration."""
def __init__(self):
self.zip_code = "01970" # Salem, MA
self.latitude = "42.3554334"
self.longitude = "-71.060511"
self.weather_api_key = self._get_required_env("WEATHER_API_KEY")
self.auth_token = self._get_required_env("AUTH_TOKEN")
@staticmethod
def _get_required_env(key: str) -> str:
"""Get required environment variable or raise error."""
value = os.environ.get(key)
if not value:
raise ValueError(f"Required environment variable {key} is not set")
return value
# Global config instance
config = Config()
class DateTimeFormatter:
"""Utility class for datetime formatting."""
@staticmethod
def format_date(dt: datetime) -> str:
"""Format datetime as abbreviated date string (e.g., 'mon 15')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%a %d").lower() return dt.strftime("%a %d").lower()
@staticmethod
def format_time(dt): def format_time(dt: datetime) -> str:
dt = dt.astimezone(eastern) """Format datetime as time string (e.g., '02:30pm')."""
dt = dt.astimezone(EASTERN_TZ)
return dt.strftime("%I:%M%p").lower() return dt.strftime("%I:%M%p").lower()
@staticmethod
def format_datetime(dt: datetime) -> str:
"""Format datetime as date and time string."""
return f"{DateTimeFormatter.format_date(dt)} {DateTimeFormatter.format_time(dt)}"
def format_datetime(dt): @staticmethod
dt = dt.astimezone(eastern) def relative_day_to_date(relative_day: str) -> str:
return f"{format_date(dt)} {format_time(dt)}" """Convert relative day string to formatted date string."""
now = datetime.now()
day_delta = timedelta(days=1)
relative_day = relative_day.lower().strip()
if relative_day == "yesterday":
return DateTimeFormatter.format_date(now - day_delta)
elif relative_day == "today":
return DateTimeFormatter.format_date(now)
elif relative_day == "tomorrow":
return DateTimeFormatter.format_date(now + day_delta)
else:
return relative_day
def relative_day_to_date(rel_dt): class WeatherData:
dt = datetime.now() """Data model for weather information."""
day = timedelta(days=1)
match rel_dt.lower().strip(): def __init__(self, raw_data: Dict[str, Any]):
case "yesterday": self.raw_data = raw_data
return format_date(dt - day) self._validate_data()
case "today":
return format_date(dt) def _validate_data(self) -> None:
case "tomorrow": """Validate required fields in weather data."""
return format_date(dt + day) required_fields = ["current", "daily"]
case passthrough: for field in required_fields:
return passthrough if field not in self.raw_data:
raise ValueError(f"Missing required field in weather data: {field}")
@property
def current(self) -> Dict[str, Any]:
"""Get current weather data."""
return self.raw_data["current"]
@property
def daily_periods(self) -> List[Dict[str, Any]]:
"""Get daily forecast periods (limited to next 2 days)."""
return self.raw_data["daily"][:2]
def build_daily_data(date, pollen_periods, weather_periods): class PollenData:
"""Data model for pollen information."""
def __init__(self, raw_data: Dict[str, Any]):
self.raw_data = raw_data
self._validate_data()
def _validate_data(self) -> None:
"""Validate required fields in pollen data."""
required_fields = ["ForecastDate", "Location"]
for field in required_fields:
if field not in self.raw_data:
raise ValueError(f"Missing required field in pollen data: {field}")
@property
def forecast_date(self) -> str:
"""Get formatted forecast date."""
dt = datetime.fromisoformat(self.raw_data["ForecastDate"])
return DateTimeFormatter.format_datetime(dt)
@property
def periods(self) -> List[Dict[str, Any]]:
"""Get pollen periods for today and tomorrow."""
periods = self.raw_data["Location"].get("periods", [])
valid_periods = []
for period in periods:
period_type = period.get("Type", "").lower().strip()
if period_type in ["today", "tomorrow"]:
index_value = period.get("Index", 0)
# Convert pollen index to percentage scale (0-100)
pollen_percentage = int(index_value / POLLEN_MAX_INDEX * POLLEN_PERCENTAGE_SCALE)
valid_periods.append({
"index": pollen_percentage,
"period": DateTimeFormatter.relative_day_to_date(period_type),
})
return valid_periods
def error_handler(func):
"""Decorator to handle exceptions in async functions."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.exception(f"Error in {func.__name__}: {e}")
return []
return wrapper
class WeatherService:
"""Service for fetching weather data."""
@staticmethod
@error_handler
async def fetch_weather(latitude: str, longitude: str, api_key: str) -> List[Dict[str, Any]]:
"""Fetch weather data from OpenWeatherMap API."""
cache_key = (latitude, longitude)
if cache_key in weather_cache:
return weather_cache[cache_key]
url = (
f"https://api.openweathermap.org/data/3.0/onecall"
f"?lat={latitude}&lon={longitude}&appid={api_key}&units=imperial"
)
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url)
response.raise_for_status()
raw_data = response.json()
weather_data = WeatherData(raw_data)
result = WeatherService._format_weather_data(weather_data)
weather_cache[cache_key] = result
return result
@staticmethod
def _format_weather_data(weather_data: WeatherData) -> List[Dict[str, Any]]:
"""Format weather data for API response."""
current = weather_data.current
daily_periods = weather_data.daily_periods
current_dt = datetime.fromtimestamp(current["dt"])
return [{
"forecast_date": DateTimeFormatter.format_datetime(current_dt),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": DateTimeFormatter.format_time(datetime.fromtimestamp(current["sunrise"])),
"sunset": DateTimeFormatter.format_time(datetime.fromtimestamp(current["sunset"])),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
WeatherService._format_daily_period(period)
for period in daily_periods
],
}]
@staticmethod
def _format_daily_period(period: Dict[str, Any]) -> Dict[str, Any]:
"""Format a single daily weather period."""
period_dt = datetime.fromtimestamp(period["dt"])
return {
"low": int(round(period["temp"]["min"])),
"high": int(round(period["temp"]["max"])),
"desc": period["weather"][0]["description"],
"humidity": period["humidity"],
"sunrise": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunrise"])),
"sunset": DateTimeFormatter.format_time(datetime.fromtimestamp(period["sunset"])),
"pressure": period["pressure"],
"period": DateTimeFormatter.format_date(period_dt),
}
class PollenService:
"""Service for fetching pollen data."""
@staticmethod
@error_handler
async def fetch_pollen(zip_code: str) -> List[Dict[str, Any]]:
"""Fetch pollen data from pollen.com API."""
if zip_code in pollen_cache:
return pollen_cache[zip_code]
url = f"https://www.pollen.com/api/forecast/current/pollen/{zip_code}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zip_code}",
}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
raw_data = response.json()
pollen_data = PollenData(raw_data)
result = [{
"forecast_date": pollen_data.forecast_date,
"periods": pollen_data.periods,
}]
pollen_cache[zip_code] = result
return result
class DataAggregator:
"""Service for aggregating weather and pollen data."""
@staticmethod
def build_daily_data(
date: str,
pollen_periods: Dict[str, Dict[str, Any]],
weather_periods: Dict[str, Dict[str, Any]]
) -> Dict[str, Any]:
"""Build daily data combining weather and pollen information."""
daily_data = {} daily_data = {}
if date in pollen_periods: if date in pollen_periods:
daily_data["pollen"] = pollen_periods[date]["index"] daily_data["pollen"] = pollen_periods[date]["index"]
if date in weather_periods: if date in weather_periods:
weather_data = weather_periods[date] weather_data = weather_periods[date]
daily_data.update({ daily_data.update({
@ -74,121 +300,33 @@ def build_daily_data(date, pollen_periods, weather_periods):
"sunset": weather_data["sunset"], "sunset": weather_data["sunset"],
"pressure": weather_data["pressure"], "pressure": weather_data["pressure"],
}) })
return daily_data return daily_data
@staticmethod
def create_periods_lookup(
pollen_data: List[Dict[str, Any]],
weather_data: List[Dict[str, Any]]
) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Dict[str, Any]]]:
"""Create lookup dictionaries for pollen and weather periods."""
pollen_periods = {}
weather_periods = {}
def fallback_handler(func): # Process pollen data
@functools.wraps(func) if pollen_data and pollen_data[0].get("periods"):
async def wrapper(*args, **kwargs): pollen_periods = {
try: p["period"]: p for p in pollen_data[0]["periods"]
result = await func(*args, **kwargs)
except Exception as e:
logger.exception(e)
return []
return result
return wrapper
@fallback_handler
async def fetch_pollen(zipcode):
if zipcode in pollen_cache:
return pollen_cache[zipcode]
url = f"https://www.pollen.com/api/forecast/current/pollen/{zipcode}"
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/123.0.0.0 Safari/537.36"
),
"Accept": "application/json, text/plain, */*",
"Referer": url,
"Cookie": f"geo={zipcode}",
} }
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url, headers=headers) # Process weather data
response.raise_for_status() if weather_data and weather_data[0].get("periods"):
data = response.json() weather_periods = {
result = [ p["period"]: p for p in weather_data[0]["periods"]
{
"forecast_date": format_datetime(
datetime.fromisoformat(data["ForecastDate"])
),
"periods": [
{
"index": int(d["Index"] / 12. * 100),
"period": relative_day_to_date(d["Type"]),
} }
for d in data["Location"]["periods"]
if d["Type"].lower().strip() in ["today", "tomorrow"]
],
}
]
pollen_cache[zipcode] = result
return result
# Add current weather data
@fallback_handler if weather_data and weather_data[0]:
async def fetch_weather(lat, lon, weather_api_key): data = weather_data[0]
cache_key = (lat, lon)
if cache_key in weather_cache:
return weather_cache[cache_key]
url = f"https://api.openweathermap.org/data/3.0/onecall?lat={lat}&lon={lon}&appid={weather_api_key}&units=imperial"
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(url)
response.raise_for_status()
data = response.json()
current, periods = data["current"], data["daily"][:2]
result = [
{
"forecast_date": format_datetime(datetime.fromtimestamp(current["dt"])),
"current_temp": int(round(current["temp"])),
"current_feels_like": int(round(current["feels_like"])),
"current_humidity": current["humidity"],
"sunrise": format_time(datetime.fromtimestamp(current["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(current["sunset"])),
"current_pressure": current["pressure"],
"current_desc": current["weather"][0]["description"],
"periods": [
{
"low": int(round(p["temp"]["min"])),
"high": int(round(p["temp"]["max"])),
"desc": p["weather"][0]["description"],
"humidity": p["humidity"],
"sunrise": format_time(datetime.fromtimestamp(p["sunrise"])),
"sunset": format_time(datetime.fromtimestamp(p["sunset"])),
"pressure": p["pressure"],
"period": format_date(datetime.fromtimestamp(p["dt"])),
}
for p in periods
],
}
]
weather_cache[cache_key] = result
return result
@app.get("/")
async def read_root(token: str):
if token != CONFIG["auth_token"]:
raise HTTPException(status_code=403, detail="unauthorized")
[
pollen,
weather,
] = await asyncio.gather(
fetch_pollen(CONFIG["zip"]),
fetch_weather(CONFIG["lat"], CONFIG["lon"], CONFIG["weather_api_key"]),
)
pollen_periods = {p["period"]: p for p in pollen[0]["periods"]} if pollen and pollen[0]["periods"] else {}
weather_periods = {p["period"]: p for p in weather[0]["periods"]} if weather and weather[0]["periods"] else {}
# Add current weather data as a "current" period
if weather and weather[0]:
data = weather[0]
weather_periods["current"] = { weather_periods["current"] = {
"period": "current", "period": "current",
"temp": data["current_temp"], "temp": data["current_temp"],
@ -197,35 +335,79 @@ async def read_root(token: str):
"pressure": data["current_pressure"], "pressure": data["current_pressure"],
"desc": data["current_desc"], "desc": data["current_desc"],
"sunrise": data["sunrise"], "sunrise": data["sunrise"],
"sunset": data["sunset"] "sunset": data["sunset"],
} }
today_date = format_date(datetime.now()) return pollen_periods, weather_periods
tomorrow_date = format_date(datetime.now() + timedelta(days=1))
@app.get("/")
async def get_weather_pollen_report(token: str) -> Dict[str, Any]:
"""
Get weather and pollen report.
Args:
token: Authentication token
Returns:
Dictionary containing current, today, and tomorrow weather/pollen data
Raises:
HTTPException: If authentication fails
"""
# Validate authentication
if token != config.auth_token:
raise HTTPException(status_code=403, detail="Unauthorized")
# Fetch data concurrently
pollen_data, weather_data = await asyncio.gather(
PollenService.fetch_pollen(config.zip_code),
WeatherService.fetch_weather(config.latitude, config.longitude, config.weather_api_key),
)
# Create period lookups
pollen_periods, weather_periods = DataAggregator.create_periods_lookup(
pollen_data, weather_data
)
# Calculate date strings
now = datetime.now()
today_date = DateTimeFormatter.format_date(now)
tomorrow_date = DateTimeFormatter.format_date(now + timedelta(days=1))
# Build response
result = { result = {
"fetched_at": format_datetime(datetime.now()), "fetched_at": DateTimeFormatter.format_datetime(now),
"current": {}, "current": {},
"today": {}, "today": {},
"tomorrow": {}, "tomorrow": {},
} }
# Add current weather data
if "current" in weather_periods: if "current" in weather_periods:
weather_data = weather_periods["current"] current_weather = weather_periods["current"]
result["current"] = { result["current"] = {
"temp": weather_data["temp"], "temp": current_weather["temp"],
"feels_like": weather_data["feels_like"], "feels_like": current_weather["feels_like"],
"desc": weather_data["desc"], "desc": current_weather["desc"],
"humidity": weather_data["humidity"], "humidity": current_weather["humidity"],
"pressure": weather_data["pressure"] "pressure": current_weather["pressure"],
} }
today_data = build_daily_data(today_date, pollen_periods, weather_periods) # Add today's data
today_data = DataAggregator.build_daily_data(today_date, pollen_periods, weather_periods)
if today_data: if today_data:
result["today"] = today_data result["today"] = today_data
tomorrow_data = build_daily_data(tomorrow_date, pollen_periods, weather_periods) # Add tomorrow's data
tomorrow_data = DataAggregator.build_daily_data(tomorrow_date, pollen_periods, weather_periods)
if tomorrow_data: if tomorrow_data:
result["tomorrow"] = tomorrow_data result["tomorrow"] = tomorrow_data
return result return result
@app.get("/health")
async def health_check() -> Dict[str, str]:
"""Health check endpoint."""
return {"status": "healthy"}