Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #new SSO-login endpoint
- class SSOLoginView(APIView):
- permission_classes = []
- def get(self, request, *args, **kwargs):
- token = request.query_params.get("sso-token")
- # check if token is in URL
- if not token:
- return Response({"error": "Token not found in URL"}, status=status.HTTP_400_BAD_REQUEST)
- try:
- # decode JWT token
- decoded_token = jwt.decode(token, options={"verify_signature": False})
- # korisnik_id and poslovni_subjekt_id from token
- korisnik_id = decoded_token.get("korisnikId")
- poslovni_subjekt_id = decoded_token.get("poslovniSubjektId")
- # check for validity of korisnik_id and poslovni_subjekt_id
- if not korisnik_id or not poslovni_subjekt_id:
- return Response({"error": "korisnik_id is missing from token"}, status=status.HTTP_400_BAD_REQUEST)
- # API call Digitalna Komora to get login data
- external_api_url = f"http://test-intra2019.digitalnakomora.hr/HGKClaniceHostToHostAPI/api/Korisnik/GetPrijavaKorisnikPS/{korisnik_id}/{poslovni_subjekt_id}"
- response = requests.get(external_api_url, timeout=5)
- if response.status_code != 200:
- return Response({"error": "Data from Digitalna Komora not recived"}, status=response.status_code)
- korisnik_data = response.json().get("PrijavaKorisnikData", {})
- # get email - we should get response example before defining this?
- korisnik_email = korisnik_data.get("PrijavaKorisnikDataKorisnik", {}).get("email")
- if not korisnik_email:
- return Response({"error": "No email i Digitalna Komora api", "responseData": response.json()}, status=status.HTTP_400_BAD_REQUEST)
- # check for user in database by email
- user = User.objects.filter(email=korisnik_email).first()
- if not user:
- return Response({"error": "User not found in Tendernet"}, status=status.HTTP_403_FORBIDDEN)
- # get device_id if it exists, otherwise generate a new one
- device_token_entry = DeviceToken.objects.filter(token__user=user).first()
- if device_token_entry:
- device_id = device_token_entry.device_id
- else:
- device_id = str(uuid.uuid4()) # Generate new device_id if none exists
- # Handle device limit per license
- company_users = user.company.users.all()
- difference = (
- DeviceToken.objects.filter(token__user__in=company_users)
- .exclude(device_id=device_id)
- .count()
- - user.company.license.device_num
- )
- if difference >= 0:
- device_tokens = (
- DeviceToken.objects.filter(token__user__in=company_users)
- .exclude(device_id=device_id)
- .order_by("-token__created")
- .values_list("device_id", flat=True)
- .distinct()
- )
- tokens = DeviceToken.objects.filter(
- device_id__in=device_tokens[: user.company.license.device_num]
- ).values_list("token", flat=True)
- AuthToken.objects.filter(user__in=company_users).exclude(digest__in=tokens).delete()
- # Generate Knox token and store device_id
- instance, token = AuthToken.objects.create(user)
- DeviceToken.objects.update_or_create(
- device_id=device_id, defaults={"token": instance}
- )
- return Response({"token": token, "user": {"id": user.id, "email": user.email, "device_id": device_id}}, status=status.HTTP_200_OK)
- except Exception as e:
- return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement