myfy-data¶
Database/ORM module for myfy with async SQLAlchemy and REQUEST-scoped sessions.
Overview¶
myfy-data provides seamless database integration built on SQLAlchemy 2.0+, with async/await support, automatic session management, and deep integration with myfy's dependency injection system.
Installation¶
Dependencies:
myfy-core- Core frameworksqlalchemy[asyncio]- SQLAlchemy with async supportaiosqlite- Async SQLite driver (default)alembic- Database migrations
Key Features¶
Async SQLAlchemy 2.0+¶
Modern async/await database operations:
from myfy.data import DataModule
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
@route.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession) -> dict:
# session automatically injected
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
return {"id": user.id, "name": user.name}
REQUEST-Scoped Sessions¶
Each HTTP request automatically gets its own database session:
@route.post("/users")
async def create_user(body: CreateUserDTO, session: AsyncSession) -> User:
user = User(**body.dict())
session.add(user)
await session.commit()
return user
# session automatically closed after response
Connection Pooling¶
Production-ready connection management with configurable pooling:
from myfy.data import DatabaseSettings
settings = DatabaseSettings(
database_url="postgresql+asyncpg://user:pass@localhost/db",
pool_size=10,
max_overflow=20,
pool_timeout=30.0
)
Zero Configuration¶
Works out-of-the-box with SQLite:
from myfy.core import Application
from myfy.data import DataModule
app = Application(auto_discover=False)
app.add_module(DataModule()) # Uses SQLite by default!
Auto Table Creation (Development)¶
Automatically create tables from your models in development:
from myfy.data import DataModule, Base
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
app.add_module(DataModule(
auto_create_tables=True, # Creates tables on startup
metadata=Base.metadata,
))
Note: auto_create_tables is blocked in production environments. Use Alembic migrations for production deployments.
Quick Start¶
Basic Application¶
from myfy.core import Application
from myfy.data import DataModule, Base
from myfy.web import route, WebModule
from sqlalchemy import Column, Integer, String, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import declarative_base
from starlette.exceptions import HTTPException
# Define model
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
email = Column(String(100))
# Define routes
@route.get("/users")
async def list_users(session: AsyncSession) -> list[dict]:
result = await session.execute(select(User))
users = result.scalars().all()
return [{"id": u.id, "name": u.name, "email": u.email} for u in users]
@route.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession) -> dict:
result = await session.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return {"id": user.id, "name": user.name, "email": user.email}
# Create application with auto table creation (development only)
app = Application(auto_discover=False)
app.add_module(DataModule(
auto_create_tables=True, # Auto-creates tables on startup
metadata=Base.metadata,
))
app.add_module(WebModule())
if __name__ == "__main__":
import asyncio
asyncio.run(app.run())
Run with:
PostgreSQL Application¶
from myfy.core import Application
from myfy.data import DataModule, DatabaseSettings
# Configure PostgreSQL
settings = DatabaseSettings(
database_url="postgresql+asyncpg://user:password@localhost/mydb"
)
app = Application(auto_discover=False)
app.add_module(DataModule(settings=settings))
app.add_module(WebModule())
Database Configuration¶
DatabaseSettings¶
Configure database connection and pooling:
from myfy.data import DatabaseSettings
settings = DatabaseSettings(
# Connection
database_url="postgresql+asyncpg://user:pass@localhost/db",
# Connection Pool
pool_size=5, # Number of persistent connections
max_overflow=10, # Additional connections allowed
pool_timeout=30.0, # Timeout for getting connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Test connections before use
# Debugging
echo=False, # Log SQL statements
echo_pool=False, # Log pool checkout/checkin
# SQLite-specific
sqlite_check_same_thread=False # Required for async SQLite
)
Environment Variables¶
Configure via environment variables with MYFY_DATA_ prefix:
# .env
MYFY_DATA_DATABASE_URL=postgresql+asyncpg://user:pass@localhost/db
MYFY_DATA_POOL_SIZE=10
MYFY_DATA_MAX_OVERFLOW=20
MYFY_DATA_ECHO=true
Supported Databases¶
SQLite (default):
database_url="sqlite+aiosqlite:///./myfy.db" # File
database_url="sqlite+aiosqlite:///:memory:" # In-memory
PostgreSQL:
MySQL:
Note: Install appropriate async driver:
- SQLite: aiosqlite (included)
- PostgreSQL: asyncpg
- MySQL: aiomysql
Models¶
Defining Models¶
Use SQLAlchemy's declarative base:
from myfy.data import Base, Column, Integer, String
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
Relationships¶
Define relationships between models:
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Post(Base):
__tablename__ = "posts"
id = Column(Integer, primary_key=True)
title = Column(String(200))
user_id = Column(Integer, ForeignKey("users.id"))
# Relationship
user = relationship("User", back_populates="posts")
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100))
# Relationship
posts = relationship("Post", back_populates="user")
CRUD Operations¶
Create¶
@route.post("/users")
async def create_user(body: CreateUserDTO, session: AsyncSession) -> dict:
user = User(name=body.name, email=body.email)
session.add(user)
await session.commit()
await session.refresh(user) # Get generated ID
return {"id": user.id, "name": user.name}
Read¶
from sqlalchemy import select
@route.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession) -> dict:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
return {"id": user.id, "name": user.name}
Update¶
@route.put("/users/{user_id}")
async def update_user(
user_id: int,
body: UpdateUserDTO,
session: AsyncSession
) -> dict:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
user.name = body.name
user.email = body.email
await session.commit()
return {"id": user.id, "name": user.name}
Delete¶
@route.delete("/users/{user_id}", status_code=204)
async def delete_user(user_id: int, session: AsyncSession) -> None:
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404)
await session.delete(user)
await session.commit()
Queries¶
Filtering¶
from sqlalchemy import select, and_, or_
# Simple filter
result = await session.execute(
select(User).where(User.age >= 18)
)
# Multiple conditions (AND)
result = await session.execute(
select(User).where(
and_(
User.age >= 18,
User.email.like("%@example.com")
)
)
)
# OR conditions
result = await session.execute(
select(User).where(
or_(
User.age < 18,
User.age > 65
)
)
)
Ordering¶
from sqlalchemy import desc
# Ascending
result = await session.execute(
select(User).order_by(User.name)
)
# Descending
result = await session.execute(
select(User).order_by(desc(User.created_at))
)
Pagination¶
@route.get("/users")
async def list_users(
page: int = 1,
per_page: int = 20,
session: AsyncSession = None
) -> dict:
offset = (page - 1) * per_page
result = await session.execute(
select(User)
.offset(offset)
.limit(per_page)
)
users = result.scalars().all()
# Get total count
total_result = await session.execute(
select(func.count()).select_from(User)
)
total = total_result.scalar()
return {
"items": users,
"page": page,
"per_page": per_page,
"total": total
}
Joins¶
from sqlalchemy import join
# Explicit join
result = await session.execute(
select(User, Post)
.join(Post, User.id == Post.user_id)
.where(Post.published == True)
)
# Using relationship
result = await session.execute(
select(User)
.join(User.posts)
.where(Post.published == True)
)
Transactions¶
Explicit Transactions¶
@route.post("/transfer")
async def transfer_money(
from_id: int,
to_id: int,
amount: float,
session: AsyncSession
) -> dict:
async with session.begin_nested():
# Deduct from sender
sender = await session.get(Account, from_id)
sender.balance -= amount
# Add to receiver
receiver = await session.get(Account, to_id)
receiver.balance += amount
# Commit happens automatically on context exit
await session.commit()
return {"status": "success"}
Rollback on Error¶
@route.post("/create-user-with-profile")
async def create_user_with_profile(
body: CreateUserDTO,
session: AsyncSession
) -> dict:
try:
# Create user
user = User(**body.dict())
session.add(user)
await session.flush() # Get user.id without committing
# Create profile
profile = UserProfile(user_id=user.id, bio="...")
session.add(profile)
await session.commit()
return {"id": user.id}
except Exception as e:
await session.rollback()
raise HTTPException(status_code=500, detail=str(e))
Migrations¶
Initialize Alembic¶
from myfy.data import MigrationManager
manager = MigrationManager()
manager.init() # Creates alembic.ini and alembic/ directory
Create Migration¶
# Auto-generate migration from model changes
alembic revision --autogenerate -m "Add users table"
# Or use MigrationManager
manager = MigrationManager()
manager.revision("Add users table", autogenerate=True)
Apply Migrations¶
# Upgrade to latest
alembic upgrade head
# Or use MigrationManager
manager = MigrationManager()
manager.upgrade()
Configure Alembic¶
Update alembic/env.py to use your models:
from myfy.data import data_module, Base
from myapp.models import User, Post # Import all models
# Set target_metadata to your Base
target_metadata = Base.metadata
# Get database URL from settings
config.set_main_option(
"sqlalchemy.url",
data_module._settings.database_url
)
Session Management¶
Manual Session Creation¶
from myfy.data import SessionFactory
async def some_background_task(session_factory: SessionFactory):
async with session_factory.session_context() as session:
# Use session
user = User(name="Alice")
session.add(user)
# Automatically commits on exit
Direct Engine Access¶
from myfy.data import data_module
engine = data_module.get_engine()
async with engine.connect() as conn:
result = await conn.execute("SELECT 1")
print(result.scalar())
Best Practices¶
Use REQUEST-Scoped Sessions¶
# ✓ Good - Automatic session management
@route.get("/users")
async def list_users(session: AsyncSession) -> list[User]:
result = await session.execute(select(User))
return result.scalars().all()
# ✗ Bad - Manual session creation in routes
@route.get("/users")
async def list_users(session_factory: SessionFactory) -> list[User]:
async with session_factory.session_context() as session:
result = await session.execute(select(User))
return result.scalars().all()
Always Use async/await¶
# ✓ Good - Async operations
result = await session.execute(select(User))
users = result.scalars().all()
# ✗ Bad - Synchronous operations
# session.query(User).all() # Don't use sync query() API
Use Type Hints¶
# ✓ Good - Type hints for IDE support
from sqlalchemy.ext.asyncio import AsyncSession
@route.get("/users")
async def list_users(session: AsyncSession) -> list[dict]:
...
# ✗ Bad - No type hints
@route.get("/users")
async def list_users(session):
...
Initialize Tables on Startup¶
# ✓ Good - Use auto_create_tables for development
app.add_module(DataModule(
auto_create_tables=True,
metadata=Base.metadata,
))
# ✓ Good - Use Alembic migrations for production
# alembic upgrade head
# ✗ Bad - Manual table creation scripts
Use Migrations for Schema Changes¶
# ✓ Good - Version control schema changes
# alembic revision --autogenerate -m "Add email column"
# alembic upgrade head
# ✗ Bad - Manual schema modifications
# ALTER TABLE users ADD COLUMN email VARCHAR(100);
Common Patterns¶
Repository Pattern¶
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
class UserRepository:
def __init__(self, session: AsyncSession):
self.session = session
async def get_by_id(self, user_id: int) -> User | None:
result = await self.session.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def list_all(self) -> list[User]:
result = await self.session.execute(select(User))
return result.scalars().all()
async def create(self, user: User) -> User:
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return user
@route.get("/users/{user_id}")
async def get_user(user_id: int, session: AsyncSession) -> dict:
repo = UserRepository(session)
user = await repo.get_by_id(user_id)
if not user:
raise HTTPException(status_code=404)
return {"id": user.id, "name": user.name}
Service Layer¶
from myfy.core import provider, SINGLETON
class UserService:
def __init__(self, settings: Settings):
self.settings = settings
async def get_user(self, user_id: int, session: AsyncSession) -> User:
repo = UserRepository(session)
user = await repo.get_by_id(user_id)
if not user:
raise ValueError(f"User {user_id} not found")
return user
@provider(scope=SINGLETON)
def user_service(settings: Settings) -> UserService:
return UserService(settings)
@route.get("/users/{user_id}")
async def get_user(
user_id: int,
service: UserService,
session: AsyncSession
) -> dict:
user = await service.get_user(user_id, session)
return {"id": user.id, "name": user.name}
Examples¶
Complete example in examples/frontend-demo/ with:
- Todo CRUD operations
- SQLAlchemy models
- REQUEST-scoped sessions
- Database initialization
API Reference¶
DataModule¶
class DataModule:
def __init__(
self,
settings: DatabaseSettings | None = None,
auto_create_tables: bool = False,
metadata: MetaData | None = None,
)
"""
Args:
settings: Custom database settings (defaults to environment)
auto_create_tables: Create tables on startup (dev/test only)
metadata: SQLAlchemy MetaData for auto_create_tables
"""
@property
def name(self) -> str # Returns "data"
@property
def requires(self) -> list[type] # Returns []
@property
def provides(self) -> list[type] # Returns [IDataProvider]
def configure(self, container: Container) -> None
def extend(self, container: Container) -> None
def finalize(self, container: Container) -> None
async def start(self) -> None # Creates tables if auto_create_tables=True
async def stop(self) -> None
def get_engine(self) -> AsyncEngine
def get_session_factory(self) -> SessionFactory
DatabaseSettings¶
class DatabaseSettings(BaseSettings):
database_url: str = "sqlite+aiosqlite:///./myfy.db"
pool_size: int = 5
max_overflow: int = 10
pool_timeout: float = 30.0
pool_recycle: int = 3600
pool_pre_ping: bool = True
echo: bool = False
echo_pool: bool = False
sqlite_check_same_thread: bool = False
environment: str = "development" # "development", "test", or "production"
Environment variables use MYFY_DATA_ prefix (e.g., MYFY_DATA_ENVIRONMENT=production).
SessionFactory¶
class SessionFactory:
def create_session(self) -> AsyncSession
@asynccontextmanager
async def session_context(self) -> AsyncIterator[AsyncSession]
Next Steps¶
- Add Authentication: Install
myfy-authfor user authentication - Add Testing: Learn how to test database operations
- Performance: Optimize queries with indexes and eager loading
- Monitoring: Add query logging and performance tracking