-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathuser.py
More file actions
94 lines (65 loc) · 2.63 KB
/
Copy pathuser.py
File metadata and controls
94 lines (65 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import uuid
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.orm import joinedload
from python_api.db.models import User, UserRole
from python_api.services.auth import get_password_hash
async def get_user_by_id(session: AsyncSession, user_id: uuid.UUID) -> User | None:
result = await session.execute(
select(User).options(joinedload(User.role)).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def get_user_by_email(session: AsyncSession, email: str) -> User | None:
result = await session.execute(
select(User).options(joinedload(User.role)).where(User.email == email)
)
return result.scalar_one_or_none()
async def create_user(session: AsyncSession, email: str, password: str) -> User:
result = await session.execute(select(UserRole).where(UserRole.title == "normal"))
normal_role = result.scalar_one()
user = User(
email=email,
password_hash=get_password_hash(password),
role_id=normal_role.id,
is_active=True,
)
session.add(user)
await session.flush()
user_result = await session.execute(
select(User).options(joinedload(User.role)).where(User.id == user.id)
)
return user_result.scalar_one()
async def update_user_password(session: AsyncSession, user: User, new_password: str) -> None:
user.password_hash = get_password_hash(new_password)
await session.flush()
async def update_user_bio(session: AsyncSession, user: User, bio: str) -> User:
user.bio = bio
await session.flush()
return user
async def deactivate_user(session: AsyncSession, user: User) -> User:
user.is_active = False
await session.flush()
return user
async def activate_user(session: AsyncSession, user: User) -> User:
user.is_active = True
await session.flush()
return user
async def delete_user(session: AsyncSession, user: User) -> None:
await session.delete(user)
await session.flush()
async def change_user_role(
session: AsyncSession, user: User, role_id: uuid.UUID
) -> User:
user.role_id = role_id
await session.flush()
await session.refresh(user, attribute_names=["role"])
return user
async def list_all_users(session: AsyncSession) -> list[User]:
result = await session.execute(select(User).options(joinedload(User.role)))
return list(result.scalars().unique().all())
async def list_all_roles(session: AsyncSession) -> list[UserRole]:
result = await session.execute(select(UserRole))
return list(result.scalars().all())