Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/sentry_sdk/envelope.py: 21%

189 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2023-07-17 14:22 -0600

1import io 

2import json 

3import mimetypes 

4 

5from sentry_sdk._compat import text_type, PY2 

6from sentry_sdk._types import MYPY 

7from sentry_sdk.session import Session 

8from sentry_sdk.utils import json_dumps, capture_internal_exceptions 

9 

10if MYPY: 10 ↛ 11line 10 didn't jump to line 11, because the condition on line 10 was never true

11 from typing import Any 

12 from typing import Optional 

13 from typing import Union 

14 from typing import Dict 

15 from typing import List 

16 from typing import Iterator 

17 

18 from sentry_sdk._types import Event, EventDataCategory 

19 

20 

21def parse_json(data): 

22 # type: (Union[bytes, text_type]) -> Any 

23 # on some python 3 versions this needs to be bytes 

24 if not PY2 and isinstance(data, bytes): 

25 data = data.decode("utf-8", "replace") 

26 return json.loads(data) 

27 

28 

29class Envelope(object): 

30 def __init__( 

31 self, 

32 headers=None, # type: Optional[Dict[str, Any]] 

33 items=None, # type: Optional[List[Item]] 

34 ): 

35 # type: (...) -> None 

36 if headers is not None: 

37 headers = dict(headers) 

38 self.headers = headers or {} 

39 if items is None: 

40 items = [] 

41 else: 

42 items = list(items) 

43 self.items = items 

44 

45 @property 

46 def description(self): 

47 # type: (...) -> str 

48 return "envelope with %s items (%s)" % ( 

49 len(self.items), 

50 ", ".join(x.data_category for x in self.items), 

51 ) 

52 

53 def add_event( 

54 self, event # type: Event 

55 ): 

56 # type: (...) -> None 

57 self.add_item(Item(payload=PayloadRef(json=event), type="event")) 

58 

59 def add_transaction( 

60 self, transaction # type: Event 

61 ): 

62 # type: (...) -> None 

63 self.add_item(Item(payload=PayloadRef(json=transaction), type="transaction")) 

64 

65 def add_session( 

66 self, session # type: Union[Session, Any] 

67 ): 

68 # type: (...) -> None 

69 if isinstance(session, Session): 

70 session = session.to_json() 

71 self.add_item(Item(payload=PayloadRef(json=session), type="session")) 

72 

73 def add_sessions( 

74 self, sessions # type: Any 

75 ): 

76 # type: (...) -> None 

77 self.add_item(Item(payload=PayloadRef(json=sessions), type="sessions")) 

78 

79 def add_item( 

80 self, item # type: Item 

81 ): 

82 # type: (...) -> None 

83 self.items.append(item) 

84 

85 def get_event(self): 

86 # type: (...) -> Optional[Event] 

87 for items in self.items: 

88 event = items.get_event() 

89 if event is not None: 

90 return event 

91 return None 

92 

93 def get_transaction_event(self): 

94 # type: (...) -> Optional[Event] 

95 for item in self.items: 

96 event = item.get_transaction_event() 

97 if event is not None: 

98 return event 

99 return None 

100 

101 def __iter__(self): 

102 # type: (...) -> Iterator[Item] 

103 return iter(self.items) 

104 

105 def serialize_into( 

106 self, f # type: Any 

107 ): 

108 # type: (...) -> None 

109 f.write(json_dumps(self.headers)) 

110 f.write(b"\n") 

111 for item in self.items: 

112 item.serialize_into(f) 

113 

114 def serialize(self): 

115 # type: (...) -> bytes 

116 out = io.BytesIO() 

117 self.serialize_into(out) 

118 return out.getvalue() 

119 

120 @classmethod 

121 def deserialize_from( 

122 cls, f # type: Any 

123 ): 

124 # type: (...) -> Envelope 

125 headers = parse_json(f.readline()) 

126 items = [] 

127 while 1: 

128 item = Item.deserialize_from(f) 

129 if item is None: 

130 break 

131 items.append(item) 

132 return cls(headers=headers, items=items) 

133 

134 @classmethod 

135 def deserialize( 

136 cls, bytes # type: bytes 

137 ): 

138 # type: (...) -> Envelope 

139 return cls.deserialize_from(io.BytesIO(bytes)) 

140 

141 def __repr__(self): 

142 # type: (...) -> str 

143 return "<Envelope headers=%r items=%r>" % (self.headers, self.items) 

144 

145 

146class PayloadRef(object): 

147 def __init__( 

148 self, 

149 bytes=None, # type: Optional[bytes] 

150 path=None, # type: Optional[Union[bytes, text_type]] 

151 json=None, # type: Optional[Any] 

152 ): 

153 # type: (...) -> None 

154 self.json = json 

155 self.bytes = bytes 

156 self.path = path 

157 

158 def get_bytes(self): 

159 # type: (...) -> bytes 

160 if self.bytes is None: 

161 if self.path is not None: 

162 with capture_internal_exceptions(): 

163 with open(self.path, "rb") as f: 

164 self.bytes = f.read() 

165 elif self.json is not None: 

166 self.bytes = json_dumps(self.json) 

167 else: 

168 self.bytes = b"" 

169 return self.bytes 

170 

171 @property 

172 def inferred_content_type(self): 

173 # type: (...) -> str 

174 if self.json is not None: 

175 return "application/json" 

176 elif self.path is not None: 

177 path = self.path 

178 if isinstance(path, bytes): 

179 path = path.decode("utf-8", "replace") 

180 ty = mimetypes.guess_type(path)[0] 

181 if ty: 

182 return ty 

183 return "application/octet-stream" 

184 

185 def __repr__(self): 

186 # type: (...) -> str 

187 return "<Payload %r>" % (self.inferred_content_type,) 

188 

189 

190class Item(object): 

191 def __init__( 

192 self, 

193 payload, # type: Union[bytes, text_type, PayloadRef] 

194 headers=None, # type: Optional[Dict[str, Any]] 

195 type=None, # type: Optional[str] 

196 content_type=None, # type: Optional[str] 

197 filename=None, # type: Optional[str] 

198 ): 

199 if headers is not None: 

200 headers = dict(headers) 

201 elif headers is None: 

202 headers = {} 

203 self.headers = headers 

204 if isinstance(payload, bytes): 

205 payload = PayloadRef(bytes=payload) 

206 elif isinstance(payload, text_type): 

207 payload = PayloadRef(bytes=payload.encode("utf-8")) 

208 else: 

209 payload = payload 

210 

211 if filename is not None: 

212 headers["filename"] = filename 

213 if type is not None: 

214 headers["type"] = type 

215 if content_type is not None: 

216 headers["content_type"] = content_type 

217 elif "content_type" not in headers: 

218 headers["content_type"] = payload.inferred_content_type 

219 

220 self.payload = payload 

221 

222 def __repr__(self): 

223 # type: (...) -> str 

224 return "<Item headers=%r payload=%r data_category=%r>" % ( 

225 self.headers, 

226 self.payload, 

227 self.data_category, 

228 ) 

229 

230 @property 

231 def type(self): 

232 # type: (...) -> Optional[str] 

233 return self.headers.get("type") 

234 

235 @property 

236 def data_category(self): 

237 # type: (...) -> EventDataCategory 

238 ty = self.headers.get("type") 

239 if ty == "session": 

240 return "session" 

241 elif ty == "attachment": 

242 return "attachment" 

243 elif ty == "transaction": 

244 return "transaction" 

245 elif ty == "event": 

246 return "error" 

247 elif ty == "client_report": 

248 return "internal" 

249 else: 

250 return "default" 

251 

252 def get_bytes(self): 

253 # type: (...) -> bytes 

254 return self.payload.get_bytes() 

255 

256 def get_event(self): 

257 # type: (...) -> Optional[Event] 

258 """ 

259 Returns an error event if there is one. 

260 """ 

261 if self.type == "event" and self.payload.json is not None: 

262 return self.payload.json 

263 return None 

264 

265 def get_transaction_event(self): 

266 # type: (...) -> Optional[Event] 

267 if self.type == "transaction" and self.payload.json is not None: 

268 return self.payload.json 

269 return None 

270 

271 def serialize_into( 

272 self, f # type: Any 

273 ): 

274 # type: (...) -> None 

275 headers = dict(self.headers) 

276 bytes = self.get_bytes() 

277 headers["length"] = len(bytes) 

278 f.write(json_dumps(headers)) 

279 f.write(b"\n") 

280 f.write(bytes) 

281 f.write(b"\n") 

282 

283 def serialize(self): 

284 # type: (...) -> bytes 

285 out = io.BytesIO() 

286 self.serialize_into(out) 

287 return out.getvalue() 

288 

289 @classmethod 

290 def deserialize_from( 

291 cls, f # type: Any 

292 ): 

293 # type: (...) -> Optional[Item] 

294 line = f.readline().rstrip() 

295 if not line: 

296 return None 

297 headers = parse_json(line) 

298 length = headers.get("length") 

299 if length is not None: 

300 payload = f.read(length) 

301 f.readline() 

302 else: 

303 # if no length was specified we need to read up to the end of line 

304 # and remove it (if it is present, i.e. not the very last char in an eof terminated envelope) 

305 payload = f.readline().rstrip(b"\n") 

306 if headers.get("type") in ("event", "transaction", "metric_buckets"): 

307 rv = cls(headers=headers, payload=PayloadRef(json=parse_json(payload))) 

308 else: 

309 rv = cls(headers=headers, payload=payload) 

310 return rv 

311 

312 @classmethod 

313 def deserialize( 

314 cls, bytes # type: bytes 

315 ): 

316 # type: (...) -> Optional[Item] 

317 return cls.deserialize_from(io.BytesIO(bytes))