Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/rest_framework_simplejwt/serializers.py: 60%

102 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1from django.conf import settings 

2from django.contrib.auth import authenticate, get_user_model 

3from django.contrib.auth.models import update_last_login 

4from django.utils.translation import gettext_lazy as _ 

5from rest_framework import exceptions, serializers 

6from rest_framework.exceptions import ValidationError 

7 

8from .settings import api_settings 

9from .tokens import RefreshToken, SlidingToken, UntypedToken 

10 

11if api_settings.BLACKLIST_AFTER_ROTATION: 11 ↛ 12line 11 didn't jump to line 12, because the condition on line 11 was never true

12 from .token_blacklist.models import BlacklistedToken 

13 

14 

15class PasswordField(serializers.CharField): 

16 def __init__(self, *args, **kwargs): 

17 kwargs.setdefault("style", {}) 

18 

19 kwargs["style"]["input_type"] = "password" 

20 kwargs["write_only"] = True 

21 

22 super().__init__(*args, **kwargs) 

23 

24 

25class TokenObtainSerializer(serializers.Serializer): 

26 username_field = get_user_model().USERNAME_FIELD 

27 token_class = None 

28 

29 default_error_messages = { 

30 "no_active_account": _("No active account found with the given credentials") 

31 } 

32 

33 def __init__(self, *args, **kwargs): 

34 super().__init__(*args, **kwargs) 

35 

36 self.fields[self.username_field] = serializers.CharField() 

37 self.fields["password"] = PasswordField() 

38 

39 def validate(self, attrs): 

40 authenticate_kwargs = { 

41 self.username_field: attrs[self.username_field], 

42 "password": attrs["password"], 

43 } 

44 try: 

45 authenticate_kwargs["request"] = self.context["request"] 

46 except KeyError: 

47 pass 

48 

49 self.user = authenticate(**authenticate_kwargs) 

50 

51 if not api_settings.USER_AUTHENTICATION_RULE(self.user): 51 ↛ 52line 51 didn't jump to line 52, because the condition on line 51 was never true

52 raise exceptions.AuthenticationFailed( 

53 self.error_messages["no_active_account"], 

54 "no_active_account", 

55 ) 

56 

57 return {} 

58 

59 @classmethod 

60 def get_token(cls, user): 

61 return cls.token_class.for_user(user) 

62 

63 

64class TokenObtainPairSerializer(TokenObtainSerializer): 

65 token_class = RefreshToken 

66 

67 def validate(self, attrs): 

68 data = super().validate(attrs) 

69 

70 refresh = self.get_token(self.user) 

71 

72 data["refresh"] = str(refresh) 

73 data["access"] = str(refresh.access_token) 

74 

75 if api_settings.UPDATE_LAST_LOGIN: 75 ↛ 76line 75 didn't jump to line 76, because the condition on line 75 was never true

76 update_last_login(None, self.user) 

77 

78 return data 

79 

80 

81class TokenObtainSlidingSerializer(TokenObtainSerializer): 

82 token_class = SlidingToken 

83 

84 def validate(self, attrs): 

85 data = super().validate(attrs) 

86 

87 token = self.get_token(self.user) 

88 

89 data["token"] = str(token) 

90 

91 if api_settings.UPDATE_LAST_LOGIN: 

92 update_last_login(None, self.user) 

93 

94 return data 

95 

96 

97class TokenRefreshSerializer(serializers.Serializer): 

98 refresh = serializers.CharField() 

99 access = serializers.CharField(read_only=True) 

100 token_class = RefreshToken 

101 

102 def validate(self, attrs): 

103 refresh = self.token_class(attrs["refresh"]) 

104 

105 data = {"access": str(refresh.access_token)} 

106 

107 if api_settings.ROTATE_REFRESH_TOKENS: 

108 if api_settings.BLACKLIST_AFTER_ROTATION: 

109 try: 

110 # Attempt to blacklist the given refresh token 

111 refresh.blacklist() 

112 except AttributeError: 

113 # If blacklist app not installed, `blacklist` method will 

114 # not be present 

115 pass 

116 

117 refresh.set_jti() 

118 refresh.set_exp() 

119 refresh.set_iat() 

120 

121 data["refresh"] = str(refresh) 

122 

123 return data 

124 

125 

126class TokenRefreshSlidingSerializer(serializers.Serializer): 

127 token = serializers.CharField() 

128 token_class = SlidingToken 

129 

130 def validate(self, attrs): 

131 token = self.token_class(attrs["token"]) 

132 

133 # Check that the timestamp in the "refresh_exp" claim has not 

134 # passed 

135 token.check_exp(api_settings.SLIDING_TOKEN_REFRESH_EXP_CLAIM) 

136 

137 # Update the "exp" and "iat" claims 

138 token.set_exp() 

139 token.set_iat() 

140 

141 return {"token": str(token)} 

142 

143 

144class TokenVerifySerializer(serializers.Serializer): 

145 token = serializers.CharField() 

146 

147 def validate(self, attrs): 

148 token = UntypedToken(attrs["token"]) 

149 

150 if ( 

151 api_settings.BLACKLIST_AFTER_ROTATION 

152 and "rest_framework_simplejwt.token_blacklist" in settings.INSTALLED_APPS 

153 ): 

154 jti = token.get(api_settings.JTI_CLAIM) 

155 if BlacklistedToken.objects.filter(token__jti=jti).exists(): 

156 raise ValidationError("Token is blacklisted") 

157 

158 return {} 

159 

160 

161class TokenBlacklistSerializer(serializers.Serializer): 

162 refresh = serializers.CharField() 

163 token_class = RefreshToken 

164 

165 def validate(self, attrs): 

166 refresh = self.token_class(attrs["refresh"]) 

167 try: 

168 refresh.blacklist() 

169 except AttributeError: 

170 pass 

171 return {}