Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/serializers.py: 60%
102 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1from django.conf import settings
2from django.contrib.auth import authenticate, get_user_model
3from django.contrib.auth.models import update_last_login
4from django.utils.translation import gettext_lazy as _
5from rest_framework import exceptions, serializers
6from rest_framework.exceptions import ValidationError
8from .settings import api_settings
9from .tokens import RefreshToken, SlidingToken, UntypedToken
11if api_settings.BLACKLIST_AFTER_ROTATION: 11 ↛ 12line 11 didn't jump to line 12, because the condition on line 11 was never true
12 from .token_blacklist.models import BlacklistedToken
15class PasswordField(serializers.CharField):
16 def __init__(self, *args, **kwargs):
17 kwargs.setdefault("style", {})
19 kwargs["style"]["input_type"] = "password"
20 kwargs["write_only"] = True
22 super().__init__(*args, **kwargs)
25class TokenObtainSerializer(serializers.Serializer):
26 username_field = get_user_model().USERNAME_FIELD
27 token_class = None
29 default_error_messages = {
30 "no_active_account": _("No active account found with the given credentials")
31 }
33 def __init__(self, *args, **kwargs):
34 super().__init__(*args, **kwargs)
36 self.fields[self.username_field] = serializers.CharField()
37 self.fields["password"] = PasswordField()
39 def validate(self, attrs):
40 authenticate_kwargs = {
41 self.username_field: attrs[self.username_field],
42 "password": attrs["password"],
43 }
44 try:
45 authenticate_kwargs["request"] = self.context["request"]
46 except KeyError:
47 pass
49 self.user = authenticate(**authenticate_kwargs)
51 if not api_settings.USER_AUTHENTICATION_RULE(self.user): 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true
52 raise exceptions.AuthenticationFailed(
53 self.error_messages["no_active_account"],
54 "no_active_account",
55 )
57 return {}
59 @classmethod
60 def get_token(cls, user):
61 return cls.token_class.for_user(user)
64class TokenObtainPairSerializer(TokenObtainSerializer):
65 token_class = RefreshToken
67 def validate(self, attrs):
68 data = super().validate(attrs)
70 refresh = self.get_token(self.user)
72 data["refresh"] = str(refresh)
73 data["access"] = str(refresh.access_token)
75 if api_settings.UPDATE_LAST_LOGIN: 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true
76 update_last_login(None, self.user)
78 return data
81class TokenObtainSlidingSerializer(TokenObtainSerializer):
82 token_class = SlidingToken
84 def validate(self, attrs):
85 data = super().validate(attrs)
87 token = self.get_token(self.user)
89 data["token"] = str(token)
91 if api_settings.UPDATE_LAST_LOGIN:
92 update_last_login(None, self.user)
94 return data
97class TokenRefreshSerializer(serializers.Serializer):
98 refresh = serializers.CharField()
99 access = serializers.CharField(read_only=True)
100 token_class = RefreshToken
102 def validate(self, attrs):
103 refresh = self.token_class(attrs["refresh"])
105 data = {"access": str(refresh.access_token)}
107 if api_settings.ROTATE_REFRESH_TOKENS:
108 if api_settings.BLACKLIST_AFTER_ROTATION:
109 try:
110 # Attempt to blacklist the given refresh token
111 refresh.blacklist()
112 except AttributeError:
113 # If blacklist app not installed, `blacklist` method will
114 # not be present
115 pass
117 refresh.set_jti()
118 refresh.set_exp()
119 refresh.set_iat()
121 data["refresh"] = str(refresh)
123 return data
126class TokenRefreshSlidingSerializer(serializers.Serializer):
127 token = serializers.CharField()
128 token_class = SlidingToken
130 def validate(self, attrs):
131 token = self.token_class(attrs["token"])
133 # Check that the timestamp in the "refresh_exp" claim has not
134 # passed
135 token.check_exp(api_settings.SLIDING_TOKEN_REFRESH_EXP_CLAIM)
137 # Update the "exp" and "iat" claims
138 token.set_exp()
139 token.set_iat()
141 return {"token": str(token)}
144class TokenVerifySerializer(serializers.Serializer):
145 token = serializers.CharField()
147 def validate(self, attrs):
148 token = UntypedToken(attrs["token"])
150 if (
151 api_settings.BLACKLIST_AFTER_ROTATION
152 and "rest_framework_simplejwt.token_blacklist" in settings.INSTALLED_APPS
153 ):
154 jti = token.get(api_settings.JTI_CLAIM)
155 if BlacklistedToken.objects.filter(token__jti=jti).exists():
156 raise ValidationError("Token is blacklisted")
158 return {}
161class TokenBlacklistSerializer(serializers.Serializer):
162 refresh = serializers.CharField()
163 token_class = RefreshToken
165 def validate(self, attrs):
166 refresh = self.token_class(attrs["refresh"])
167 try:
168 refresh.blacklist()
169 except AttributeError:
170 pass
171 return {}