Commit 8d7cb029 authored by ZeinabRm13's avatar ZeinabRm13

Refactor

parents
# Stage 1: Base image with Python
FROM python:3.12-slim AS base
# Set working directory inside the container
WORKDIR /app
# Prevent Python from writing .pyc files and ensure output is sent straight to the terminal
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Stage 2: Install system dependencies, including the ODBC driver
FROM base AS system-deps
# Install build-essential tools and driver dependencies
RUN apt-get update && apt-get install -y curl gnupg build-essential
# Add Microsoft GPG key and repository for the ODBC driver
RUN curl -fsSL https://packages.microsoft.com/keys/microsoft.asc | gpg --dearmor -o /usr/share/keyrings/microsoft-prod.gpg
RUN curl -fsSL "https://packages.microsoft.com/config/ubuntu/22.04/prod.list" > /etc/apt/sources.list.d/mssql-release.list
# Install the ODBC driver and required development headers
RUN apt-get update && \
apt-get -y install unixodbc-dev && \
ACCEPT_EULA=Y apt-get -y install msodbcsql17
# Clean up apt cache to keep image size down
RUN apt-get clean && rm -rf /var/lib/apt/lists/*
# Stage 3: Install Python dependencies
FROM system-deps AS python-deps
# Copy only the requirements file to leverage Docker cache
COPY requirements.txt .
# Install Python packages
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r requirements.txt
# Stage 4: Production image with application code
FROM python-deps AS production
# Copy the application source code from the 'src' directory into the container's working directory
COPY ./chart_analyzer/src .
# Expose the port the app runs on
EXPOSE 8000
# Define the command to run the application
# Uvicorn will look for the 'app' object in the 'main.py' file at the root of our WORKDIR
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
\ No newline at end of file
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = postgresql+asyncpg://chart_analyzer_user:chartanalyzer13@localhost:5432/chart_analyzer
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
Generic single-database configuration.
\ No newline at end of file
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
from alembic import context
import asyncio
import sys
import os
# Add the project root to sys.path to import your models
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
if project_root not in sys.path:
sys.path.insert(0, project_root)
# Import your Base from your app
from src.infrastructure.adapters.sqlserver import Base
# ------------------------------
# Configure Alembic
# ------------------------------
config = context.config
# Setup logging (if you have it in alembic.ini)
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Set target metadata for autogenerate
target_metadata = Base.metadata
# ------------------------------
# Offline Mode (no DB connection)
# ------------------------------
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
# ------------------------------
# Online Mode (with async engine)
# ------------------------------
async def run_async_migrations() -> None:
"""Create and run async migrations."""
connectable: AsyncEngine = create_async_engine(
config.get_main_option("sqlalchemy.url"),
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
# ✅ Correct: Configure context inside run_sync
await connection.run_sync(lambda conn: context.configure(
connection=conn,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
))
# ✅ Correct: Wrap run_migrations in lambda to avoid extra arg
await connection.run_sync(lambda conn: context.run_migrations())
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
# ------------------------------
# Choose mode
# ------------------------------
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
\ No newline at end of file
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}
version: '3.8'
services:
# Application Service (FastAPI)
app:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
volumes:
# Mount your local 'src' directory to the '/app' directory in the container
- ./chart_analyzer/src:/app
depends_on:
- db
environment:
# The hostname 'db' is resolved by Docker's internal networking
- DATABASE_URL=postgresql+asyncpg://user:password@db:5432/ChartAnalyzer
- JWT_SECRET=a_very_secret_key
- JWT_ALGORITHM=HS256
- JWT_EXPIRE_MINUTES=30
# Overriding the CMD to enable --reload for development
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
# Database Service (SQL Server)
db:
image: postgres:13-alpine
ports:
- "5432:5432"
environment:
POSTGRES_USER: "user"
POSTGRES_PASSWORD: "password"
POSTGRES_DB: "ChartAnalyzer"
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
# Named volume to persist database data across container restarts
postgres_data:
\ No newline at end of file
#!/bin/bash
# Project root
PROJECT_DIR="chart_analyzer"
mkdir -p "$PROJECT_DIR"/{src,tests/{unit,integration,e2e}}
# Domain layer
mkdir -p "$PROJECT_DIR"/src/domain/{entities,value_objects,ports,services,exceptions}
# Application layer
mkdir -p "$PROJECT_DIR"/src/application/{use_cases,services}
# Infrastructure layer
mkdir -p "$PROJECT_DIR"/src/infrastructure/{adapters,api/fastapi,services}
# Create empty Python files with __init__.py
touch "$PROJECT_DIR"/src/{domain,application,infrastructure}/__init__.py
touch "$PROJECT_DIR"/src/domain/{entities,value_objects,ports,services,exceptions}/__init__.py
touch "$PROJECT_DIR"/src/application/{use_cases,services}/__init__.py
touch "$PROJECT_DIR"/src/infrastructure/{adapters,api,services}/__init__.py
touch "$PROJECT_DIR"/src/infrastructure/api/fastapi/__init__.py
# Main domain files
touch "$PROJECT_DIR"/src/domain/entities/{chart_image,chart_analysis}.py
touch "$PROJECT_DIR"/src/domain/value_objects/trend_summary.py
touch "$PROJECT_DIR"/src/domain/ports/{image_storage,model_integration}.py
touch "$PROJECT_DIR"/src/domain/services/chart_validation.py
touch "$PROJECT_DIR"/src/domain/exceptions/domain_errors.py
# Main application files
touch "$PROJECT_DIR"/src/application/use_cases/{upload_image,general_analysis,specific_question,continue_convo}.py
touch "$PROJECT_DIR"/src/application/services/conversation_ctx.py
# Main infrastructure files
touch "$PROJECT_DIR"/src/infrastructure/adapters/{mongodb_image_storage,model_adapter}.py
touch "$PROJECT_DIR"/src/infrastructure/api/fastapi/{routes,schemas}.py
touch "$PROJECT_DIR"/src/infrastructure/services/image_preprocessor.py
# Entry point and test files
touch "$PROJECT_DIR"/main.py
touch "$PROJECT_DIR"/tests/{unit,integration,e2e}/__init__.py
echo "Project structure created at: $PROJECT_DIR"
# Web Framework
fastapi
uvicorn[standard]
# Database and ORM
sqlalchemy
pydantic_settings
alembic
# PostgreSQL Driver
psycopg2-binary
asyncpg
# Authentication
passlib[bcrypt]
python-jose[cryptography]
# Other
python-multipart
\ No newline at end of file
# src/application/__init__.py
from .services.authentication_service import AuthService
from .ports.authentication_service_port import AuthServicePort
__all__ = ["AuthService","AuthServicePort"]
\ No newline at end of file
from authentication import (
RegisterRequestDTO,
LoginRequestDTO,
UserResponseDTO,
TokenResponseDTO
)
from analysis import AnalysisRequestDTO, AnalysisResponseDTO
\ No newline at end of file
from pydantic import BaseModel
class AnalysisRequestDTO(BaseModel):
image_bytes: bytes # Or UploadFile for FastAPI
question: str
class AnalysisResponseDTO(BaseModel):
answer: str
analysis_id: str
\ No newline at end of file
# src/application/dtos/auth.py
from pydantic import BaseModel, EmailStr
class RegisterRequestDTO(BaseModel):
email: EmailStr
password: str
class LoginRequestDTO(BaseModel):
email: EmailStr
password: str
class UserResponseDTO(BaseModel):
id: str
email: str
is_active: bool
class TokenResponseDTO(BaseModel):
access_token: str
token_type: str = "bearer"
\ No newline at end of file
from .authentication_service_port import AuthServicePort
\ No newline at end of file
from abc import ABC, abstractmethod
from domain.entities.user import User
from application.dtos.authentication import (
RegisterRequest,
LoginRequest,
UserResponse,
TokenResponse
)
# src/domain/ports/auth_service.py
class AuthServicePort(ABC):
@abstractmethod
async def register(self, request: RegisterRequest) -> UserResponse: ...
@abstractmethod
async def login(self, request: LoginRequest) -> TokenResponse: # Returns JWT
...
\ No newline at end of file
from .authentication_service import AuthService
\ No newline at end of file
# src/application/services/auth.py
from datetime import datetime, timezone, timedelta
from jose import jwt
from passlib.context import CryptContext
from domain.ports import UserRepositoryPort
from domain.entities.user import User
import uuid
from application.ports import AuthServicePort
from application.dtos.authentication import (
RegisterRequest,
LoginRequest,
UserResponse,
TokenResponse
)
class AuthService(AuthServicePort):
def __init__(
self,
user_repo: UserRepositoryPort,
secret_key: str,
algorithm: str = "HS256",
expires_minutes: int = 30
):
self._user_repo = user_repo
self._secret_key = secret_key
self._algorithm = algorithm
self._expires_minutes = expires_minutes
self._pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def register(self, request: RegisterRequest) -> UserResponse:
if await self._user_repo.get_by_email(request.email):
raise ValueError("Email already registered")
user = User(
id=str(uuid.uuid4()),
email=request.email,
password_hash=self._hash_password(request.password)
)
await self._user_repo.create_user(user)
return self._user_to_dto(user)
async def login(self, request: LoginRequest) -> TokenResponse:
user = await self._user_repo.get_by_email(request.email)
if not user or not self._verify_password(request.password, user.password_hash):
raise ValueError("Invalid credentials")
return TokenResponse(
access_token=self._create_access_token(user.email),
token_type="bearer"
)
def _hash_password(self, password: str) -> str:
return self._pwd_context.hash(password)
def _verify_password(self, plain_password: str, hashed_password: str) -> bool:
return self._pwd_context.verify(plain_password, hashed_password)
def _create_access_token(self, email: str) -> str:
expires = datetime.now(timezone.utc) + timedelta(minutes=self._expires_minutes)
return jwt.encode(
{"sub": email, "exp": expires},
self._secret_key,
algorithm=self._algorithm
)
def _user_to_dto(self, user: User) -> UserResponse:
return UserResponse(
id=user.id,
email=user.email,
is_active=user.is_active
)
\ No newline at end of file
from domain.entities import ChartAnalysis
from domain.ports import AnalysisRepositoryPort, ChartsRepositoryPort
class AnalyzeChartUseCase:
def __init__(
self,
charts_repo: ChartsRepositoryPort,
analysis_repo: AnalysisRepositoryPort
):
self._image_storage = charts_repo
self._analysis_repo = analysis_repo
def execute(self, image_bytes: bytes, question: str) -> ChartAnalysis:
# 1. Store image
image_id = self._image_storage.save(image_bytes)
# 2. Call your multimodal model (mock or real)
answer = "Mocked analysis result" # Replace with model call
# 3. Save analysis
analysis = ChartAnalysis(
id="generated_uuid",
chart_image_id=image_id,
question=question,
answer=answer
)
self._analysis_repo.save_analysis(analysis)
return analysis
\ No newline at end of file
from domain.entities import ChartImage
from domain.ports import ChartsRepositoryPort
import uuid
class UploadChartUseCase:
def __init__(self, image_repo: ChartsRepositoryPort):
self._image_repo = image_repo
def execute(self, image_bytes: bytes, user_id: str) -> str:
chart_image = ChartImage(
id=str(uuid.uuid4()),
user_id=user_id,
image_data=image_bytes
)
return self._image_repo.save(chart_image) # Returns image ID
\ No newline at end of file
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql+asyncpg://chart_analyzer_user:chartanalyzer13@localhost:5432/chart_analyzer"
JWT_SECRET: str = "a_very_secret_key"
JWT_ALGORITHM: str = "HS256"
JWT_EXPIRE_MINUTES: int = 30
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
\ No newline at end of file
from fastapi import Depends
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from application.services import AuthService
from domain.ports import UserRepositoryPort
from infrastructure.adapters.sqlserver.sql_user_repository import SqlUserRepository
from config import settings
engine = create_async_engine(settings.DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
def get_user_repository(session: AsyncSession = Depends(get_db_session)) -> UserRepositoryPort:
return SqlUserRepository(session)
def get_auth_service(
user_repo: UserRepositoryPort = Depends(get_user_repository),
) -> AuthService:
return AuthService(
user_repo=user_repo,
secret_key=settings.JWT_SECRET,
algorithm=settings.JWT_ALGORITHM,
expires_minutes=settings.JWT_EXPIRE_MINUTES,
)
\ No newline at end of file
# src/domain/__init__.py
from .entities.chart_image import ChartImage
from .entities.chart_analysis import ChartAnalysis
from .ports.repositories.user_repository import UserRepositoryPort
from .ports.repositories.charts_repository import ChartsRepositoryPort
from .ports.repositories.analysis_repository import AnalysisRepositoryPort
# Optional: Explicitly declare exports
__all__ = ["ChartImage", "ChartAnalysis", "ChartsRepositoryPort", "AnalysisRepositoryPort","UserRepositoryPort"]
\ No newline at end of file
from .chart_image import ChartImage
from .chart_analysis import ChartAnalysis
\ No newline at end of file
from pydantic import BaseModel
from datetime import datetime, timezone
class ChartAnalysis(BaseModel):
id: str
chart_image_id: str # Links to ChartImage
question: str
answer: str
created_at: datetime = datetime.now(timezone.utc)
\ No newline at end of file
from pydantic import BaseModel
from datetime import datetime, timezone
class ChartImage(BaseModel):
id: str # UUID
user_id: str # Owner of the chart
image_data: bytes # Or file path if storing externally
uploaded_at: datetime = datetime.now(timezone.utc)
\ No newline at end of file
# src/domain/entities/user.py
from datetime import datetime, timezone, timedelta
from pydantic import BaseModel, EmailStr
from datetime import datetime
from typing import Optional
class User(BaseModel):
id: str # UUID
email: EmailStr
password_hash: str # Always hashed, never plaintext
is_active: bool = True
created_at: datetime = datetime.now(timezone.utc)
last_login: Optional[datetime] = None
from .repositories.analysis_repository import AnalysisRepositoryPort
from .repositories.charts_repository import ChartsRepositoryPort
from .repositories.user_repository import UserRepositoryPort
from abc import ABC, abstractmethod
from domain.entities import ChartAnalysis
class AnalysisRepositoryPort(ABC):
@abstractmethod
def save(self, analysis: ChartAnalysis) -> None:
pass
\ No newline at end of file
from abc import ABC, abstractmethod
from domain.entities import ChartImage
class ChartsRepositoryPort(ABC):
@abstractmethod
def save(self, analysis: ChartImage) -> None:
pass
\ No newline at end of file
from abc import ABC, abstractmethod
from domain.entities.user import User
class UserRepositoryPort(ABC):
@abstractmethod
async def get_by_email(self, email: str) -> User | None: ...
@abstractmethod
async def create_user(self, user: User) -> None: ...
from .sql_analysis_repository import SqlAnalysisRepository
from .sql_charts_repository import SqlChartsRepository
from .models import Base
\ No newline at end of file
# src/infrastructure/database/models.py
from sqlalchemy import Column, String, Boolean, DateTime, LargeBinary, ForeignKey, Text
from sqlalchemy.sql import func
from sqlalchemy.dialects.mssql import UNIQUEIDENTIFIER # For SQL Server UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
# 1. Create a Base class that all your models will inherit from.
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(UNIQUEIDENTIFIER, primary_key=True, server_default=func.newid())
email = Column(String(255), unique=True, nullable=False)
password_hash = Column(String(255), nullable=False) # Store only hashed passwords
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
last_login = Column(DateTime(timezone=True), nullable=True)
def __repr__(self):
return f"<User(id={self.id}, email={self.email})>"
class ChartImage(Base):
__tablename__ = 'chart_images'
id = Column(UNIQUEIDENTIFIER, primary_key=True, server_default=func.newid())
user_id = Column(UNIQUEIDENTIFIER, ForeignKey('users.id'), nullable=False)
image_data = Column(LargeBinary, nullable=False) # For storing binary data
uploaded_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship
analyses = relationship("ChartAnalysis", back_populates="chart_image")
def __repr__(self):
return f"<ChartImage(id={self.id}, user_id={self.user_id})>"
class ChartAnalysis(Base):
__tablename__ = 'chart_analyses'
id = Column(UNIQUEIDENTIFIER, primary_key=True, server_default=func.newid())
chart_image_id = Column(UNIQUEIDENTIFIER, ForeignKey('chart_images.id'), nullable=False)
question = Column(Text, nullable=False)
answer = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
# Relationship
chart_image = relationship("ChartImage", back_populates="analyses")
def __repr__(self):
return f"<ChartAnalysis(id={self.id}, chart_image_id={self.chart_image_id})>"
\ No newline at end of file
from sqlalchemy import create_engine
from domain.ports import AnalysisRepositoryPort
from domain.entities import ChartAnalysis, ChartImage
class SqlAnalysisRepository(AnalysisRepositoryPort):
def __init__(self, connection_string: str):
self._engine = create_engine(connection_string)
def save(self, image: ChartImage) -> str:
with self._engine.connect() as conn:
conn.execute(
"INSERT INTO Charts (Id, UserId, ImageData, UploadedAt) "
"VALUES (?, ?, ?, ?)",
(image.id, image.user_id, image.image_data, image.uploaded_at)
)
conn.commit()
return image.id # Return the ID instead of a file path
\ No newline at end of file
from sqlalchemy import create_engine
from domain.ports import ChartsRepositoryPort
from domain.entities import ChartImage
class SqlChartsRepository(ChartsRepositoryPort):
def __init__(self, connection_string: str):
self._engine = create_engine(connection_string)
def save(self, image: ChartImage) -> str:
with self._engine.connect() as conn:
conn.execute(
"INSERT INTO Charts (Id, UserId, ImageData, UploadedAt) "
"VALUES (?, ?, ?, ?)",
(image.id, image.user_id, image.image_data, image.uploaded_at)
)
conn.commit()
return image.id # Return the ID instead of a file path
\ No newline at end of file
# src/infrastructure/adapters/sql_user_repository.py
from sqlalchemy import select
from domain.ports import UserRepositoryPort
from domain.entities.user import User
from sqlalchemy.ext.asyncio import AsyncSession
from pydantic import EmailStr
class SqlUserRepository(UserRepositoryPort):
def __init__(self, session: AsyncSession):
self._session = session
async def get_by_email(self, email: EmailStr) -> User | None:
result = await self._session.execute(
select(User).where(User.email == email))
return result.scalar_one_or_none()
async def create_user(self, user: User) -> None:
self._session.add(user)
await self._session.commit()
\ No newline at end of file
from fastapi import APIRouter, UploadFile, File, Depends
from application.use_cases.upload_chart import UploadChartUseCase
from fastapi import APIRouter, Depends, HTTPException, status
from application.dtos import RegisterRequestDTO, LoginRequestDTO, TokenResponseDTO
from application.ports import AuthServicePort
from dependencies import get_auth_service, get_upload_use_case
router = APIRouter(tags=["auth"])
@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
request: RegisterRequestDTO,
auth_service: AuthServicePort = Depends(get_auth_service)
):
try:
user = await auth_service.register(request.email, request.password)
return {"id": user.id, "email": user.email}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/login", response_model=TokenResponseDTO)
async def login(
request: LoginRequestDTO,
auth_service: AuthServicePort = Depends(get_auth_service)
):
try:
token = await auth_service.login(request.email, request.password)
return {"access_token": token, "token_type": "bearer"}
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
@router.post("/upload")
async def upload_chart(
file: UploadFile = File(...),
use_case: UploadChartUseCase = Depends(get_upload_use_case)
) -> dict:
image_bytes = await file.read()
image_id = use_case.execute(image_bytes, "user123") # Replace with real user ID
return {"image_id": image_id}
\ No newline at end of file
# src/infrastructure/api/fastapi/routes/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from application.dtos.authentication import RegisterRequest, LoginRequest, TokenResponse, UserResponse
from application.services import AuthService
from dependencies import get_auth_service
router = APIRouter(tags=["auth"])
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def register(
request: RegisterRequest,
auth_service: AuthService = Depends(get_auth_service),
):
try:
return await auth_service.register(request)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
auth_service: AuthService = Depends(get_auth_service),
):
try:
return await auth_service.login(request)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
headers={"WWW-Authenticate": "Bearer"},
)
\ No newline at end of file
# src/infrastructure/database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from typing import AsyncGenerator
from src.config import settings
# Create async engine
engine = create_async_engine(
settings.DATABASE_URL,
echo=True, # Log SQL queries (disable in production)
pool_size=10,
max_overflow=20
)
# Async session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
chart_analyzer/ # Project root
├── docker-compose.yml
├── Dockerfile
├── generate.sh
├── requirements.txt
├── alembic.ini # ✅ At root
├── alembic/ # ✅ At root
├── versions/
├── env.py
└── script.py.mako
├── src/ # ✅ Only one src
├── application/ # Use cases, services
├── domain/ # Entities, business logic
├── infrastructure/ # DB, adapters, external services
├── adapters/
├── sqlmodel.py # Your Base, models
└── __init__.py
└── persistence/ # DB session, engine
├── database.py
└── __init__.py
├── presentation/ # FastAPI routers, DTOs
├── api/
├── v1/
├── users.py
└── charts.py
└── __init__.py
└── __init__.py
├── main.py # FastAPI app factory
└── config.py # Config, settings
├── tests/
├── unit/
├── integration/
└── conftest.py
├── .env
├── .gitignore
└── README.md
\ No newline at end of file
from fastapi import FastAPI
from infrastructure.api.fastapi.routes import auth
app = FastAPI(
title="Chart Analyzer API",
description="API for analyzing charts and managing users.",
version="1.0.0",
)
app.include_router(auth.router, prefix="/auth")
@app.get("/")
async def root():
return {"message": "Welcome to Chart Analyzer"}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment