Coverage for app / tasks.py: 96%

164 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-26 21:50 +0000

1from datetime import datetime, timezone 

2from typing import List 

3 

4from fastapi import APIRouter, Depends, HTTPException, Body, Header 

5from sqlalchemy.orm import Session 

6from sqlalchemy.orm import joinedload 

7 

8from app.database import get_db 

9from app.models.task import Task 

10from app.models.subtask import Subtask 

11from app.priorityScoring import scoreTask 

12from app.schemas import TaskCreate, TaskResponse 

13from app.schemas import TaskResponse, SubtaskResponse 

14 

15router = APIRouter() 

16 

17# Added for testing, can be replaced 

18 

19 

20# if user id header not given in a request (it normally would be in practice since each task belongs to a user, but some tests don't have it) 

21# it is assumed to be user id 1 

22def get_current_user_id(x_user_id: int | None = Header(default=None)) -> int: 

23 if x_user_id is None: 

24 return 1 

25 return x_user_id 

26 

27 

28def get_owned_task(task_id: int, current_user_id: int, db: Session) -> Task: 

29 task = ( 

30 db.query(Task) 

31 .filter(Task.id == task_id, Task.user_id == current_user_id) 

32 .first() 

33 ) 

34 

35 if task is None: 

36 raise HTTPException(status_code=404, detail="Task not found") 

37 

38 return task 

39 

40 

41def get_owned_subtask(subtask_id: int, current_user_id: int, db: Session) -> Subtask: 

42 subtask = ( 

43 db.query(Subtask) 

44 .join(Task, Subtask.task_id == Task.id) 

45 .filter(Subtask.id == subtask_id, Task.user_id == current_user_id) 

46 .first() 

47 ) 

48 

49 if subtask is None: 

50 raise HTTPException(status_code=404, detail="Subtask not found") 

51 

52 return subtask 

53 

54 

55@router.get( 

56 "", 

57 response_model=List[TaskResponse], 

58 summary="Retrieve all tasks", 

59 description="Fetches a list of all tasks from the database, including their ID, title, description, and completion status for the current user.", 

60) 

61def get_tasks( 

62 db: Session = Depends(get_db), current_user_id: int = Depends(get_current_user_id) 

63): 

64 tasks = db.query(Task).options(joinedload(Task.subtasks)).filter(Task.user_id == current_user_id).all() 

65 

66 for task in tasks: 

67 task.priority = scoreTask(task) 

68 

69 return tasks 

70 

71 

72# Marking a task as complete 

73@router.post("/task/{task_id}/complete") 

74def complete_task( 

75 task_id: int, 

76 db: Session = Depends(get_db), 

77 current_user_id: int = Depends(get_current_user_id), 

78): 

79 task = get_owned_task(task_id, current_user_id, db) 

80 

81 if task.completed: 

82 raise HTTPException(status_code=400, detail="Task is already completed") 

83 

84 current_time = datetime.now(timezone.utc) 

85 

86 task.completed = True 

87 task.completed_at = current_time 

88 

89 for subtask in task.subtasks: 

90 subtask.completed = True 

91 subtask.completed_at = current_time 

92 

93 db.commit() 

94 db.refresh(task) 

95 

96 if task.completed: 

97 return {"message": "Task completed"} 

98 

99 

100# Marking a subtask as complete 

101@router.post("/subtask/{subtask_id}/complete") 

102def complete_subtask( 

103 subtask_id: int, 

104 db: Session = Depends(get_db), 

105 current_user_id: int = Depends(get_current_user_id), 

106): 

107 subtask = get_owned_subtask(subtask_id, current_user_id, db) 

108 

109 if subtask.completed: 

110 raise HTTPException(status_code=400, detail="Subtask is already completed") 

111 

112 parent_task = subtask.task 

113 current_time = datetime.now(timezone.utc) 

114 

115 subtask.completed = True 

116 subtask.completed_at = current_time 

117 

118 # If all subtasks of the parent task is complete, mark parent task as complete 

119 if all(st.completed for st in parent_task.subtasks): 

120 parent_task.completed = True 

121 parent_task.completed_at = current_time 

122 

123 db.commit() 

124 db.refresh(subtask) 

125 db.refresh(parent_task) 

126 

127 if subtask.completed: 

128 return {"message": "Subtask completed"} 

129 

130 

131# Reopen a task 

132@router.post("/task/{task_id}/reopen") 

133def reopen_task( 

134 task_id: int, 

135 db: Session = Depends(get_db), 

136 current_user_id: int = Depends(get_current_user_id), 

137): 

138 task = get_owned_task(task_id, current_user_id, db) 

139 

140 if not task.completed: 

141 raise HTTPException(status_code=400, detail="Task already open") 

142 

143 task.completed = False 

144 task.completed_at = None 

145 

146 for subtask in task.subtasks: 

147 subtask.completed = False 

148 subtask.completed_at = None 

149 

150 db.commit() 

151 db.refresh(task) 

152 

153 if not task.completed: 

154 return {"message": "Task reopened"} 

155 

156 

157# Reopen a subtask 

158@router.post("/subtask/{subtask_id}/reopen") 

159def reopen_subtask( 

160 subtask_id: int, 

161 db: Session = Depends(get_db), 

162 current_user_id: int = Depends(get_current_user_id), 

163): 

164 subtask = get_owned_subtask(subtask_id, current_user_id, db) 

165 

166 if not subtask.completed: 

167 raise HTTPException(status_code=400, detail="Subtask already open") 

168 

169 parent_task = subtask.task 

170 

171 subtask.completed = False 

172 subtask.completed_at = None 

173 

174 if parent_task.completed: 

175 parent_task.completed = False 

176 parent_task.completed_at = None 

177 

178 db.commit() 

179 db.refresh(subtask) 

180 

181 if not subtask.completed: 

182 return {"message": "Subtask reopened"} 

183 

184 

185# ============================================================ 

186# CRUD endpoints for subtasks 

187# ============================================================ 

188 

189 

190# Read all subtasks 

191@router.get("/subtasks", response_model=List[SubtaskResponse]) 

192def get_all_subtasks( 

193 db: Session = Depends(get_db), current_user_id: int = Depends(get_current_user_id) 

194): 

195 subtasks = ( 

196 db.query(Subtask) 

197 .join(Task, Subtask.task_id == Task.id) 

198 .filter(Task.user_id == current_user_id) 

199 .all() 

200 ) 

201 return subtasks 

202 

203 

204# Read one subtask 

205@router.get("/subtasks/{subtask_id}", response_model=SubtaskResponse) 

206def get_subtask( 

207 subtask_id: int, 

208 db: Session = Depends(get_db), 

209 current_user_id: int = Depends(get_current_user_id), 

210): 

211 subtask = get_owned_subtask(subtask_id, current_user_id, db) 

212 

213 return subtask 

214 

215 

216# Create a new subtask 

217@router.post("/subtasks", response_model=SubtaskResponse) 

218def create_subtask( 

219 subtask_data: dict = Body(...), 

220 db: Session = Depends(get_db), 

221 current_user_id: int = Depends(get_current_user_id), 

222): 

223 

224 parent_task_id = subtask_data.get("task_id") 

225 

226 if parent_task_id is None: 

227 raise HTTPException(status_code=400, detail="task_id is required") 

228 

229 get_owned_task(parent_task_id, current_user_id, db) 

230 

231 existing_subtasks_count = db.query(Subtask).filter(Subtask.task_id == parent_task_id).count() 

232 

233 new_subtask = Subtask( 

234 title=subtask_data.get("title"), 

235 task_id=parent_task_id, 

236 completed=False, 

237 order_index=existing_subtasks_count 

238 ) 

239 

240 db.add(new_subtask) 

241 db.commit() 

242 db.refresh(new_subtask) 

243 

244 return new_subtask 

245 

246 

247# Update an existing subtask 

248@router.put("/subtasks/{subtask_id}", response_model=SubtaskResponse) 

249def update_subtask( 

250 subtask_id: int, 

251 subtask_data: dict = Body(...), 

252 db: Session = Depends(get_db), 

253 current_user_id: int = Depends(get_current_user_id), 

254): 

255 subtask = get_owned_subtask(subtask_id, current_user_id, db) 

256 

257 if "task_id" in subtask_data: 

258 get_owned_task(subtask_data["task_id"], current_user_id, db) 

259 

260 for field, value in subtask_data.items(): 

261 setattr(subtask, field, value) 

262 

263 db.commit() 

264 db.refresh(subtask) 

265 

266 return subtask 

267 

268 

269# Delete a subtask 

270@router.delete("/subtasks/{subtask_id}") 

271def delete_subtask( 

272 subtask_id: int, 

273 db: Session = Depends(get_db), 

274 current_user_id: int = Depends(get_current_user_id), 

275): 

276 subtask = get_owned_subtask(subtask_id, current_user_id, db) 

277 

278 db.delete(subtask) 

279 db.commit() 

280 

281 return {"message": "Subtask deleted successfully"} 

282 

283 

284# ============================================================ 

285# CRUD endpoints for tasks 

286# ============================================================ 

287 

288 

289# Read one task 

290@router.get("/{task_id}", response_model=TaskResponse) 

291def get_task( 

292 task_id: int, 

293 db: Session = Depends(get_db), 

294 current_user_id: int = Depends(get_current_user_id), 

295): 

296 task = get_owned_task(task_id, current_user_id, db) 

297 

298 task.priority = scoreTask(task) 

299 return task 

300 

301 

302# Create a new task 

303@router.post("", response_model=TaskResponse) 

304def create_task( 

305 task_data: dict = Body(...), 

306 db: Session = Depends(get_db), 

307 current_user_id: int = Depends(get_current_user_id), 

308): 

309 # Convert due_at from JSON string into Python datetime if it is present 

310 if task_data.get("due_at"): 

311 task_data["due_at"] = datetime.fromisoformat(task_data["due_at"]) 

312 

313 task_data["user_id"] = current_user_id 

314 new_task = Task(**task_data) 

315 

316 db.add(new_task) 

317 db.commit() 

318 db.refresh(new_task) 

319 

320 new_task.priority = scoreTask(new_task) 

321 return new_task 

322 

323 

324# Update an existing task 

325@router.put("/{task_id}", response_model=TaskResponse) 

326def update_task( 

327 task_id: int, 

328 task_data: dict = Body(...), 

329 db: Session = Depends(get_db), 

330 current_user_id: int = Depends(get_current_user_id), 

331): 

332 task = get_owned_task(task_id, current_user_id, db) 

333 

334 # Convert due_at from JSON string into Python datetime if it is present 

335 if task_data.get("due_at"): 

336 task_data["due_at"] = datetime.fromisoformat(task_data["due_at"]) 

337 

338 if "user_id" in task_data: 

339 del task_data["user_id"] 

340 

341 for field, value in task_data.items(): 

342 setattr(task, field, value) 

343 

344 db.commit() 

345 db.refresh(task) 

346 

347 task.priority = scoreTask(task) 

348 return task 

349 

350 

351# Delete a task 

352@router.delete("/{task_id}") 

353def delete_task( 

354 task_id: int, 

355 db: Session = Depends(get_db), 

356 current_user_id: int = Depends(get_current_user_id), 

357): 

358 task = get_owned_task(task_id, current_user_id, db) 

359 

360 db.delete(task) 

361 db.commit() 

362 

363 return {"message": "Task deleted successfully"}