Examples
Real-world examples demonstrating errortools usage.
Web API Error Handling
import requests
from errortools import ContextException, ignore
from errortools.logging import logger
class APIError(ContextException):
code = 5000
default_detail = "API request failed"
def fetch_user(user_id: int) -> dict:
try:
response = requests.get(f"/api/users/{user_id}")
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
raise (
APIError(f"Failed to fetch user {user_id}")
.with_context(
user_id=user_id,
status_code=e.response.status_code,
endpoint=e.response.url
)
.with_cause(e)
)
# Usage with logging
req_log = logger.bind(request_id="abc-123")
with logger.catch(APIError):
user = fetch_user(42)
req_log.info("User fetched successfully")
Database Connection with Retry
import asyncio
from errortools import retry, timeout
from errortools.logging import logger
@retry(times=3, on=(ConnectionError, TimeoutError), delay=2.0)
@timeout(10.0)
async def connect_db(host: str, port: int) -> Connection:
logger.info("Connecting to {}:{}", host, port)
conn = await asyncio.open_connection(host, port)
logger.success("Connected to database")
return conn
# Automatically retries up to 3 times with 2s delay
# Times out after 10 seconds per attempt
conn = await connect_db("localhost", 5432)
Batch Processing with Error Collection
from errortools.future import ExceptionCollector
from errortools.logging import logger
def process_batch(items: list[dict]) -> None:
collector = ExceptionCollector()
for item in items:
collector.catch(process_item, item)
if collector.has_errors:
logger.error(
"Failed to process {} out of {} items",
collector.count,
len(items)
)
# Continue or raise based on requirements
if collector.count > len(items) * 0.1: # >10% failure
collector.raise_all("Batch processing failed")
else:
logger.success("All {} items processed", len(items))
def process_item(item: dict) -> None:
# Process individual item
validate(item)
save_to_db(item)
Configuration Loading with Validation
from errortools import (
BaseErrorCodes,
ignore,
ConfigurationWarning
)
from errortools.logging import logger
def load_config(path: str) -> dict:
with ignore(FileNotFoundError) as err:
with open(path) as f:
config = json.load(f)
if err.be_ignore:
ConfigurationWarning.emit(f"Config file {path} not found, using defaults")
config = get_default_config()
# Validate required fields
required = ["host", "port", "database"]
missing = [k for k in required if k not in config]
if missing:
raise BaseErrorCodes.configuration_error(
f"Missing required config: {', '.join(missing)}"
)
logger.info("Configuration loaded from {}", path)
return config
Hot Path Optimization
from errortools.future import super_fast_ignore
def process_large_dataset(data: list[dict]) -> list[str]:
results = []
for item in data:
# Use super_fast_ignore in tight loops
with super_fast_ignore(KeyError, TypeError):
# Optional field processing
value = transform(item.get("optional_field"))
results.append(value)
return results
Structured Logging in Microservices
from errortools.logging import logger
from errortools import ignore
# Service initialization
logger.add("logs/service.log", rotation=50_000_000, retention=10)
logger.add(sys.stderr, level="WARNING")
def handle_request(request_id: str, user_id: int):
# Bind request context
req_log = logger.bind(
request_id=request_id,
user_id=user_id,
service="user-service"
)
req_log.info("Request started")
try:
# Process request
result = process_user_request(user_id)
req_log.success("Request completed in {ms}ms", ms=result.duration)
return result
except Exception:
req_log.exception("Request failed")
raise
Deprecation Management
from errortools import deprecated, ignore_warns, DeprecatedWarning
@deprecated("Use new_api_v2() instead. Will be removed in v3.0")
def old_api():
return "legacy result"
# Suppress deprecation warnings in tests
with ignore_warns(DeprecatedWarning):
result = old_api() # No warning emitted
Exception Type Conversion
from errortools import reraise
class ServiceError(Exception):
pass
def call_external_service():
with reraise((ConnectionError, TimeoutError), ServiceError):
# External library calls
response = external_lib.request()
# All connection/timeout errors converted to ServiceError
return response
Error Caching for Expensive Operations
from errortools import error_cache, ignore
@error_cache(maxsize=128)
def validate_user_token(token: str) -> dict:
# Expensive validation
if not is_valid_format(token):
raise ValueError("Invalid token format")
response = auth_service.validate(token)
if not response.ok:
raise PermissionError("Token validation failed")
return response.data
# First call with invalid token raises and caches
with ignore(ValueError):
validate_user_token("bad-token")
# Second call returns cached exception immediately
with ignore(ValueError):
validate_user_token("bad-token") # No network call
print(validate_user_token.cache_info())
# CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)
Multi-Level Error Handling
from errortools import (
ContextException,
ignore_subclass)
from errortools.logging import logger
class DataError(ContextException):
code = 6000
default_detail = "Data processing error"
class ValidationError(DataError):
code = 6001
default_detail = "Validation failed"
class TransformError(DataError):
code = 6002
default_detail = "Transform failed"
def process_pipeline(data: dict):
# Catch all DataError subclasses
with ignore_subclass(DataError) as err:
validate_data(data)
transform_data(data)
save_data(data)
if err.be_ignore:
logger.error(
"Pipeline failed: {} - {}",
err.name,
err.exception
)
# Handle error or re-raise
return None
return data