diff --git a/blt/urls.py b/blt/urls.py index 11d7c23f8..77b135b76 100644 --- a/blt/urls.py +++ b/blt/urls.py @@ -627,7 +627,6 @@ path("time-logs/", TimeLogListView, name="time_logs"), path("recommend//", recommend_user, name="recommend_user"), path("recommend//blurb/", recommend_via_blurb, name="recommend_via_blurb"), - path("blog/", include("blog.urls")), path("sizzle-daily-log/", sizzle_daily_log, name="sizzle_daily_log"), path( "user-sizzle-report//", diff --git a/website/views/user.py b/website/views/user.py index 401e74477..80b489fd0 100644 --- a/website/views/user.py +++ b/website/views/user.py @@ -1,3 +1,5 @@ +import hashlib +import hmac import json import os from datetime import datetime, timezone @@ -38,6 +40,8 @@ from blt import settings from website.forms import MonitorForm, UserDeleteForm, UserProfileForm from website.models import ( + IP, + Badge, Domain, Hunt, InviteFriend, @@ -49,6 +53,7 @@ Recommendation, Tag, User, + UserBadge, UserProfile, Wallet, ) @@ -161,7 +166,7 @@ def post(self, request, *args, **kwargs): logout(request) user.delete() messages.success(request, "Account successfully deleted") - return redirect(reverse("index")) + return redirect(reverse("home")) return render(request, "user_deletion.html", {"form": form}) @@ -202,14 +207,30 @@ def get(self, request, *args, **kwargs): except Http404: messages.error(self.request, "That user was not found.") return redirect("/") + + # Update the view count and save the model + self.object.userprofile.visit_count = len(IP.objects.filter(path=request.path)) + self.object.userprofile.save() + return super(UserProfileDetailView, self).get(request, *args, **kwargs) def get_context_data(self, **kwargs): + # if userprofile does not exist, create it + if not UserProfile.objects.filter(user=self.object).exists(): + UserProfile.objects.create(user=self.object) + user = self.object context = super(UserProfileDetailView, self).get_context_data(**kwargs) - context["my_score"] = list( - Points.objects.filter(user=self.object).aggregate(total_score=Sum("score")).values() - )[0] + # Fetch badges + user_badges = UserBadge.objects.filter(user=user).select_related("badge") + + context["user_badges"] = user_badges # Add badges to context + context["is_mentor"] = UserBadge.objects.filter(user=user, badge__title="Mentor").exists() + context["available_badges"] = Badge.objects.all() + + user_points = Points.objects.filter(user=self.object) + context["user_points"] = user_points + context["my_score"] = list(user_points.aggregate(total_score=Sum("score")).values())[0] context["websites"] = ( Domain.objects.filter(issue__user=self.object) .annotate(total=Count("issue")) @@ -966,3 +987,167 @@ def profile(request): return redirect("/profile/" + request.user.username) except Exception: return redirect("/") + + +@login_required +def assign_badge(request, username): + if not UserBadge.objects.filter(user=request.user, badge__title="Mentor").exists(): + messages.error(request, "You don't have permission to assign badges.") + return redirect("profile", slug=username) + + user = get_object_or_404(get_user_model(), username=username) + badge_id = request.POST.get("badge") + reason = request.POST.get("reason", "") + badge = get_object_or_404(Badge, id=badge_id) + + # Check if the user already has this badge + if UserBadge.objects.filter(user=user, badge=badge).exists(): + messages.warning(request, "This user already has this badge.") + return redirect("profile", slug=username) + + # Assign the badge to user + UserBadge.objects.create(user=user, badge=badge, awarded_by=request.user, reason=reason) + messages.success(request, f"{badge.title} badge assigned to {user.username}.") + return redirect("profile", slug=username) + + +def badge_user_list(request, badge_id): + badge = get_object_or_404(Badge, id=badge_id) + + profiles = ( + UserProfile.objects.filter(user__userbadge__badge=badge) + .select_related("user") + .distinct() + .annotate(awarded_at=F("user__userbadge__awarded_at")) + ) + + return render( + request, + "badge_user_list.html", + { + "badge": badge, + "profiles": profiles, + }, + ) + + +@csrf_exempt +def github_webhook(request): + if request.method == "POST": + # Validate GitHub signature + signature = request.headers.get("X-Hub-Signature-256") + if not validate_signature(request.body, signature): + return JsonResponse({"status": "error", "message": "Unauthorized request"}, status=403) + + payload = json.loads(request.body) + event_type = request.headers.get("X-GitHub-Event", "") + + event_handlers = { + "pull_request": handle_pull_request_event, + "push": handle_push_event, + "pull_request_review": handle_review_event, + "issues": handle_issue_event, + "status": handle_status_event, + "fork": handle_fork_event, + "create": handle_create_event, + } + + handler = event_handlers.get(event_type) + if handler: + return handler(payload) + else: + return JsonResponse({"status": "error", "message": "Unhandled event type"}, status=400) + else: + return JsonResponse({"status": "error", "message": "Invalid method"}, status=400) + + +def handle_pull_request_event(payload): + if payload["action"] == "closed" and payload["pull_request"]["merged"]: + pr_user_profile = UserProfile.objects.filter( + github_url=payload["pull_request"]["user"]["html_url"] + ).first() + if pr_user_profile: + pr_user_instance = pr_user_profile.user + assign_github_badge(pr_user_instance, "First PR Merged") + return JsonResponse({"status": "success"}, status=200) + + +def handle_push_event(payload): + pusher_profile = UserProfile.objects.filter(github_url=payload["sender"]["html_url"]).first() + if pusher_profile: + pusher_user = pusher_profile.user + if payload.get("commits"): + assign_github_badge(pusher_user, "First Commit") + return JsonResponse({"status": "success"}, status=200) + + +def handle_review_event(payload): + reviewer_profile = UserProfile.objects.filter(github_url=payload["sender"]["html_url"]).first() + if reviewer_profile: + reviewer_user = reviewer_profile.user + assign_github_badge(reviewer_user, "First Code Review") + return JsonResponse({"status": "success"}, status=200) + + +def handle_issue_event(payload): + print("issue closed") + if payload["action"] == "closed": + closer_profile = UserProfile.objects.filter( + github_url=payload["sender"]["html_url"] + ).first() + if closer_profile: + closer_user = closer_profile.user + assign_github_badge(closer_user, "First Issue Closed") + return JsonResponse({"status": "success"}, status=200) + + +def handle_status_event(payload): + user_profile = UserProfile.objects.filter(github_url=payload["sender"]["html_url"]).first() + if user_profile: + user = user_profile.user + build_status = payload["state"] + if build_status == "success": + assign_github_badge(user, "First CI Build Passed") + elif build_status == "failure": + assign_github_badge(user, "First CI Build Failed") + return JsonResponse({"status": "success"}, status=200) + + +def handle_fork_event(payload): + user_profile = UserProfile.objects.filter(github_url=payload["sender"]["html_url"]).first() + if user_profile: + user = user_profile.user + assign_github_badge(user, "First Fork Created") + return JsonResponse({"status": "success"}, status=200) + + +def handle_create_event(payload): + if payload["ref_type"] == "branch": + user_profile = UserProfile.objects.filter(github_url=payload["sender"]["html_url"]).first() + if user_profile: + user = user_profile.user + assign_github_badge(user, "First Branch Created") + return JsonResponse({"status": "success"}, status=200) + + +def assign_github_badge(user, action_title): + try: + badge, created = Badge.objects.get_or_create(title=action_title, type="automatic") + if not UserBadge.objects.filter(user=user, badge=badge).exists(): + UserBadge.objects.create(user=user, badge=badge) + print(f"Assigned '{action_title}' badge to {user.username}") + else: + print(f"{user.username} already has the '{action_title}' badge.") + except Badge.DoesNotExist: + print(f"Badge '{action_title}' does not exist.") + + +def validate_signature(payload, signature): + if not signature: + return False + + secret = bytes(os.environ.get("GITHUB_ACCESS_TOKEN", ""), "utf-8") + computed_hmac = hmac.new(secret, payload, hashlib.sha256) + computed_signature = f"sha256={computed_hmac.hexdigest()}" + + return hmac.compare_digest(computed_signature, signature)