Coverage for app / tasks.py: 96%
164 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 21:50 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-26 21:50 +0000
1from datetime import datetime, timezone
2from typing import List
4from fastapi import APIRouter, Depends, HTTPException, Body, Header
5from sqlalchemy.orm import Session
6from sqlalchemy.orm import joinedload
8from app.database import get_db
9from app.models.task import Task
10from app.models.subtask import Subtask
11from app.priorityScoring import scoreTask
12from app.schemas import TaskCreate, TaskResponse
13from app.schemas import TaskResponse, SubtaskResponse
15router = APIRouter()
17# Added for testing, can be replaced
20# if user id header not given in a request (it normally would be in practice since each task belongs to a user, but some tests don't have it)
21# it is assumed to be user id 1
22def get_current_user_id(x_user_id: int | None = Header(default=None)) -> int:
23 if x_user_id is None:
24 return 1
25 return x_user_id
28def get_owned_task(task_id: int, current_user_id: int, db: Session) -> Task:
29 task = (
30 db.query(Task)
31 .filter(Task.id == task_id, Task.user_id == current_user_id)
32 .first()
33 )
35 if task is None:
36 raise HTTPException(status_code=404, detail="Task not found")
38 return task
41def get_owned_subtask(subtask_id: int, current_user_id: int, db: Session) -> Subtask:
42 subtask = (
43 db.query(Subtask)
44 .join(Task, Subtask.task_id == Task.id)
45 .filter(Subtask.id == subtask_id, Task.user_id == current_user_id)
46 .first()
47 )
49 if subtask is None:
50 raise HTTPException(status_code=404, detail="Subtask not found")
52 return subtask
55@router.get(
56 "",
57 response_model=List[TaskResponse],
58 summary="Retrieve all tasks",
59 description="Fetches a list of all tasks from the database, including their ID, title, description, and completion status for the current user.",
60)
61def get_tasks(
62 db: Session = Depends(get_db), current_user_id: int = Depends(get_current_user_id)
63):
64 tasks = db.query(Task).options(joinedload(Task.subtasks)).filter(Task.user_id == current_user_id).all()
66 for task in tasks:
67 task.priority = scoreTask(task)
69 return tasks
72# Marking a task as complete
73@router.post("/task/{task_id}/complete")
74def complete_task(
75 task_id: int,
76 db: Session = Depends(get_db),
77 current_user_id: int = Depends(get_current_user_id),
78):
79 task = get_owned_task(task_id, current_user_id, db)
81 if task.completed:
82 raise HTTPException(status_code=400, detail="Task is already completed")
84 current_time = datetime.now(timezone.utc)
86 task.completed = True
87 task.completed_at = current_time
89 for subtask in task.subtasks:
90 subtask.completed = True
91 subtask.completed_at = current_time
93 db.commit()
94 db.refresh(task)
96 if task.completed:
97 return {"message": "Task completed"}
100# Marking a subtask as complete
101@router.post("/subtask/{subtask_id}/complete")
102def complete_subtask(
103 subtask_id: int,
104 db: Session = Depends(get_db),
105 current_user_id: int = Depends(get_current_user_id),
106):
107 subtask = get_owned_subtask(subtask_id, current_user_id, db)
109 if subtask.completed:
110 raise HTTPException(status_code=400, detail="Subtask is already completed")
112 parent_task = subtask.task
113 current_time = datetime.now(timezone.utc)
115 subtask.completed = True
116 subtask.completed_at = current_time
118 # If all subtasks of the parent task is complete, mark parent task as complete
119 if all(st.completed for st in parent_task.subtasks):
120 parent_task.completed = True
121 parent_task.completed_at = current_time
123 db.commit()
124 db.refresh(subtask)
125 db.refresh(parent_task)
127 if subtask.completed:
128 return {"message": "Subtask completed"}
131# Reopen a task
132@router.post("/task/{task_id}/reopen")
133def reopen_task(
134 task_id: int,
135 db: Session = Depends(get_db),
136 current_user_id: int = Depends(get_current_user_id),
137):
138 task = get_owned_task(task_id, current_user_id, db)
140 if not task.completed:
141 raise HTTPException(status_code=400, detail="Task already open")
143 task.completed = False
144 task.completed_at = None
146 for subtask in task.subtasks:
147 subtask.completed = False
148 subtask.completed_at = None
150 db.commit()
151 db.refresh(task)
153 if not task.completed:
154 return {"message": "Task reopened"}
157# Reopen a subtask
158@router.post("/subtask/{subtask_id}/reopen")
159def reopen_subtask(
160 subtask_id: int,
161 db: Session = Depends(get_db),
162 current_user_id: int = Depends(get_current_user_id),
163):
164 subtask = get_owned_subtask(subtask_id, current_user_id, db)
166 if not subtask.completed:
167 raise HTTPException(status_code=400, detail="Subtask already open")
169 parent_task = subtask.task
171 subtask.completed = False
172 subtask.completed_at = None
174 if parent_task.completed:
175 parent_task.completed = False
176 parent_task.completed_at = None
178 db.commit()
179 db.refresh(subtask)
181 if not subtask.completed:
182 return {"message": "Subtask reopened"}
185# ============================================================
186# CRUD endpoints for subtasks
187# ============================================================
190# Read all subtasks
191@router.get("/subtasks", response_model=List[SubtaskResponse])
192def get_all_subtasks(
193 db: Session = Depends(get_db), current_user_id: int = Depends(get_current_user_id)
194):
195 subtasks = (
196 db.query(Subtask)
197 .join(Task, Subtask.task_id == Task.id)
198 .filter(Task.user_id == current_user_id)
199 .all()
200 )
201 return subtasks
204# Read one subtask
205@router.get("/subtasks/{subtask_id}", response_model=SubtaskResponse)
206def get_subtask(
207 subtask_id: int,
208 db: Session = Depends(get_db),
209 current_user_id: int = Depends(get_current_user_id),
210):
211 subtask = get_owned_subtask(subtask_id, current_user_id, db)
213 return subtask
216# Create a new subtask
217@router.post("/subtasks", response_model=SubtaskResponse)
218def create_subtask(
219 subtask_data: dict = Body(...),
220 db: Session = Depends(get_db),
221 current_user_id: int = Depends(get_current_user_id),
222):
224 parent_task_id = subtask_data.get("task_id")
226 if parent_task_id is None:
227 raise HTTPException(status_code=400, detail="task_id is required")
229 get_owned_task(parent_task_id, current_user_id, db)
231 existing_subtasks_count = db.query(Subtask).filter(Subtask.task_id == parent_task_id).count()
233 new_subtask = Subtask(
234 title=subtask_data.get("title"),
235 task_id=parent_task_id,
236 completed=False,
237 order_index=existing_subtasks_count
238 )
240 db.add(new_subtask)
241 db.commit()
242 db.refresh(new_subtask)
244 return new_subtask
247# Update an existing subtask
248@router.put("/subtasks/{subtask_id}", response_model=SubtaskResponse)
249def update_subtask(
250 subtask_id: int,
251 subtask_data: dict = Body(...),
252 db: Session = Depends(get_db),
253 current_user_id: int = Depends(get_current_user_id),
254):
255 subtask = get_owned_subtask(subtask_id, current_user_id, db)
257 if "task_id" in subtask_data:
258 get_owned_task(subtask_data["task_id"], current_user_id, db)
260 for field, value in subtask_data.items():
261 setattr(subtask, field, value)
263 db.commit()
264 db.refresh(subtask)
266 return subtask
269# Delete a subtask
270@router.delete("/subtasks/{subtask_id}")
271def delete_subtask(
272 subtask_id: int,
273 db: Session = Depends(get_db),
274 current_user_id: int = Depends(get_current_user_id),
275):
276 subtask = get_owned_subtask(subtask_id, current_user_id, db)
278 db.delete(subtask)
279 db.commit()
281 return {"message": "Subtask deleted successfully"}
284# ============================================================
285# CRUD endpoints for tasks
286# ============================================================
289# Read one task
290@router.get("/{task_id}", response_model=TaskResponse)
291def get_task(
292 task_id: int,
293 db: Session = Depends(get_db),
294 current_user_id: int = Depends(get_current_user_id),
295):
296 task = get_owned_task(task_id, current_user_id, db)
298 task.priority = scoreTask(task)
299 return task
302# Create a new task
303@router.post("", response_model=TaskResponse)
304def create_task(
305 task_data: dict = Body(...),
306 db: Session = Depends(get_db),
307 current_user_id: int = Depends(get_current_user_id),
308):
309 # Convert due_at from JSON string into Python datetime if it is present
310 if task_data.get("due_at"):
311 task_data["due_at"] = datetime.fromisoformat(task_data["due_at"])
313 task_data["user_id"] = current_user_id
314 new_task = Task(**task_data)
316 db.add(new_task)
317 db.commit()
318 db.refresh(new_task)
320 new_task.priority = scoreTask(new_task)
321 return new_task
324# Update an existing task
325@router.put("/{task_id}", response_model=TaskResponse)
326def update_task(
327 task_id: int,
328 task_data: dict = Body(...),
329 db: Session = Depends(get_db),
330 current_user_id: int = Depends(get_current_user_id),
331):
332 task = get_owned_task(task_id, current_user_id, db)
334 # Convert due_at from JSON string into Python datetime if it is present
335 if task_data.get("due_at"):
336 task_data["due_at"] = datetime.fromisoformat(task_data["due_at"])
338 if "user_id" in task_data:
339 del task_data["user_id"]
341 for field, value in task_data.items():
342 setattr(task, field, value)
344 db.commit()
345 db.refresh(task)
347 task.priority = scoreTask(task)
348 return task
351# Delete a task
352@router.delete("/{task_id}")
353def delete_task(
354 task_id: int,
355 db: Session = Depends(get_db),
356 current_user_id: int = Depends(get_current_user_id),
357):
358 task = get_owned_task(task_id, current_user_id, db)
360 db.delete(task)
361 db.commit()
363 return {"message": "Task deleted successfully"}