Coverage for /var/srv/projects/api.amasfac.comuna18.com/tmp/venv/lib/python3.9/site-packages/pandas/io/sql.py: 13%
684 statements
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
« prev ^ index » next coverage.py v6.4.4, created at 2023-07-17 14:22 -0600
1"""
2Collection of query wrappers / abstractions to both facilitate data
3retrieval and to reduce dependency on DB-specific API.
4"""
6from __future__ import annotations
8from contextlib import contextmanager
9from datetime import (
10 date,
11 datetime,
12 time,
13)
14from functools import partial
15import re
16from typing import (
17 TYPE_CHECKING,
18 Any,
19 Iterator,
20 cast,
21 overload,
22)
23import warnings
25import numpy as np
27import pandas._libs.lib as lib
28from pandas._typing import (
29 DateTimeErrorChoices,
30 DtypeArg,
31 IndexLabel,
32)
33from pandas.compat._optional import import_optional_dependency
34from pandas.errors import (
35 AbstractMethodError,
36 DatabaseError,
37)
38from pandas.util._exceptions import find_stack_level
40from pandas.core.dtypes.common import (
41 is_datetime64tz_dtype,
42 is_dict_like,
43 is_integer,
44 is_list_like,
45)
46from pandas.core.dtypes.dtypes import DatetimeTZDtype
47from pandas.core.dtypes.missing import isna
49from pandas import get_option
50from pandas.core.api import (
51 DataFrame,
52 Series,
53)
54from pandas.core.base import PandasObject
55import pandas.core.common as com
56from pandas.core.tools.datetimes import to_datetime
58if TYPE_CHECKING: 58 ↛ 59line 58 didn't jump to line 59, because the condition on line 58 was never true
59 from sqlalchemy import Table
62# -----------------------------------------------------------------------------
63# -- Helper functions
66def _convert_params(sql, params):
67 """Convert SQL and params args to DBAPI2.0 compliant format."""
68 args = [sql]
69 if params is not None:
70 if hasattr(params, "keys"): # test if params is a mapping
71 args += [params]
72 else:
73 args += [list(params)]
74 return args
77def _process_parse_dates_argument(parse_dates):
78 """Process parse_dates argument for read_sql functions"""
79 # handle non-list entries for parse_dates gracefully
80 if parse_dates is True or parse_dates is None or parse_dates is False:
81 parse_dates = []
83 elif not hasattr(parse_dates, "__iter__"):
84 parse_dates = [parse_dates]
85 return parse_dates
88def _handle_date_column(
89 col, utc: bool | None = None, format: str | dict[str, Any] | None = None
90):
91 if isinstance(format, dict):
92 # GH35185 Allow custom error values in parse_dates argument of
93 # read_sql like functions.
94 # Format can take on custom to_datetime argument values such as
95 # {"errors": "coerce"} or {"dayfirst": True}
96 error: DateTimeErrorChoices = format.pop("errors", None) or "ignore"
97 return to_datetime(col, errors=error, **format)
98 else:
99 # Allow passing of formatting string for integers
100 # GH17855
101 if format is None and (
102 issubclass(col.dtype.type, np.floating)
103 or issubclass(col.dtype.type, np.integer)
104 ):
105 format = "s"
106 if format in ["D", "d", "h", "m", "s", "ms", "us", "ns"]:
107 return to_datetime(col, errors="coerce", unit=format, utc=utc)
108 elif is_datetime64tz_dtype(col.dtype):
109 # coerce to UTC timezone
110 # GH11216
111 return to_datetime(col, utc=True)
112 else:
113 return to_datetime(col, errors="coerce", format=format, utc=utc)
116def _parse_date_columns(data_frame, parse_dates):
117 """
118 Force non-datetime columns to be read as such.
119 Supports both string formatted and integer timestamp columns.
120 """
121 parse_dates = _process_parse_dates_argument(parse_dates)
123 # we want to coerce datetime64_tz dtypes for now to UTC
124 # we could in theory do a 'nice' conversion from a FixedOffset tz
125 # GH11216
126 for col_name, df_col in data_frame.items():
127 if is_datetime64tz_dtype(df_col.dtype) or col_name in parse_dates:
128 try:
129 fmt = parse_dates[col_name]
130 except TypeError:
131 fmt = None
132 data_frame[col_name] = _handle_date_column(df_col, format=fmt)
134 return data_frame
137def _wrap_result(
138 data,
139 columns,
140 index_col=None,
141 coerce_float: bool = True,
142 parse_dates=None,
143 dtype: DtypeArg | None = None,
144):
145 """Wrap result set of query in a DataFrame."""
146 frame = DataFrame.from_records(data, columns=columns, coerce_float=coerce_float)
148 if dtype:
149 frame = frame.astype(dtype)
151 frame = _parse_date_columns(frame, parse_dates)
153 if index_col is not None:
154 frame.set_index(index_col, inplace=True)
156 return frame
159def execute(sql, con, params=None):
160 """
161 Execute the given SQL query using the provided connection object.
163 Parameters
164 ----------
165 sql : string
166 SQL query to be executed.
167 con : SQLAlchemy connectable(engine/connection) or sqlite3 connection
168 Using SQLAlchemy makes it possible to use any DB supported by the
169 library.
170 If a DBAPI2 object, only sqlite3 is supported.
171 params : list or tuple, optional, default: None
172 List of parameters to pass to execute method.
174 Returns
175 -------
176 Results Iterable
177 """
178 pandas_sql = pandasSQL_builder(con)
179 args = _convert_params(sql, params)
180 return pandas_sql.execute(*args)
183# -----------------------------------------------------------------------------
184# -- Read and write to DataFrames
187@overload
188def read_sql_table(
189 table_name,
190 con,
191 schema=...,
192 index_col: str | list[str] | None = ...,
193 coerce_float=...,
194 parse_dates: list[str] | dict[str, str] | None = ...,
195 columns: list[str] | None = ...,
196 chunksize: None = ...,
197) -> DataFrame:
198 ...
201@overload
202def read_sql_table(
203 table_name,
204 con,
205 schema=...,
206 index_col: str | list[str] | None = ...,
207 coerce_float=...,
208 parse_dates: list[str] | dict[str, str] | None = ...,
209 columns: list[str] | None = ...,
210 chunksize: int = ...,
211) -> Iterator[DataFrame]:
212 ...
215def read_sql_table(
216 table_name: str,
217 con,
218 schema: str | None = None,
219 index_col: str | list[str] | None = None,
220 coerce_float: bool = True,
221 parse_dates: list[str] | dict[str, str] | None = None,
222 columns: list[str] | None = None,
223 chunksize: int | None = None,
224) -> DataFrame | Iterator[DataFrame]:
225 """
226 Read SQL database table into a DataFrame.
228 Given a table name and a SQLAlchemy connectable, returns a DataFrame.
229 This function does not support DBAPI connections.
231 Parameters
232 ----------
233 table_name : str
234 Name of SQL table in database.
235 con : SQLAlchemy connectable or str
236 A database URI could be provided as str.
237 SQLite DBAPI connection mode not supported.
238 schema : str, default None
239 Name of SQL schema in database to query (if database flavor
240 supports this). Uses default schema if None (default).
241 index_col : str or list of str, optional, default: None
242 Column(s) to set as index(MultiIndex).
243 coerce_float : bool, default True
244 Attempts to convert values of non-string, non-numeric objects (like
245 decimal.Decimal) to floating point. Can result in loss of Precision.
246 parse_dates : list or dict, default None
247 - List of column names to parse as dates.
248 - Dict of ``{column_name: format string}`` where format string is
249 strftime compatible in case of parsing string times or is one of
250 (D, s, ns, ms, us) in case of parsing integer timestamps.
251 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
252 to the keyword arguments of :func:`pandas.to_datetime`
253 Especially useful with databases without native Datetime support,
254 such as SQLite.
255 columns : list, default None
256 List of column names to select from SQL table.
257 chunksize : int, default None
258 If specified, returns an iterator where `chunksize` is the number of
259 rows to include in each chunk.
261 Returns
262 -------
263 DataFrame or Iterator[DataFrame]
264 A SQL table is returned as two-dimensional data structure with labeled
265 axes.
267 See Also
268 --------
269 read_sql_query : Read SQL query into a DataFrame.
270 read_sql : Read SQL query or database table into a DataFrame.
272 Notes
273 -----
274 Any datetime values with time zone information will be converted to UTC.
276 Examples
277 --------
278 >>> pd.read_sql_table('table_name', 'postgres:///db_name') # doctest:+SKIP
279 """
280 pandas_sql = pandasSQL_builder(con, schema=schema)
281 if not pandas_sql.has_table(table_name):
282 raise ValueError(f"Table {table_name} not found")
284 # error: Item "SQLiteDatabase" of "Union[SQLDatabase, SQLiteDatabase]"
285 # has no attribute "read_table"
286 table = pandas_sql.read_table( # type: ignore[union-attr]
287 table_name,
288 index_col=index_col,
289 coerce_float=coerce_float,
290 parse_dates=parse_dates,
291 columns=columns,
292 chunksize=chunksize,
293 )
295 if table is not None:
296 return table
297 else:
298 raise ValueError(f"Table {table_name} not found", con)
301@overload
302def read_sql_query(
303 sql,
304 con,
305 index_col: str | list[str] | None = ...,
306 coerce_float=...,
307 params: list[str] | dict[str, str] | None = ...,
308 parse_dates: list[str] | dict[str, str] | None = ...,
309 chunksize: None = ...,
310 dtype: DtypeArg | None = ...,
311) -> DataFrame:
312 ...
315@overload
316def read_sql_query(
317 sql,
318 con,
319 index_col: str | list[str] | None = ...,
320 coerce_float=...,
321 params: list[str] | dict[str, str] | None = ...,
322 parse_dates: list[str] | dict[str, str] | None = ...,
323 chunksize: int = ...,
324 dtype: DtypeArg | None = ...,
325) -> Iterator[DataFrame]:
326 ...
329def read_sql_query(
330 sql,
331 con,
332 index_col: str | list[str] | None = None,
333 coerce_float: bool = True,
334 params: list[str] | dict[str, str] | None = None,
335 parse_dates: list[str] | dict[str, str] | None = None,
336 chunksize: int | None = None,
337 dtype: DtypeArg | None = None,
338) -> DataFrame | Iterator[DataFrame]:
339 """
340 Read SQL query into a DataFrame.
342 Returns a DataFrame corresponding to the result set of the query
343 string. Optionally provide an `index_col` parameter to use one of the
344 columns as the index, otherwise default integer index will be used.
346 Parameters
347 ----------
348 sql : str SQL query or SQLAlchemy Selectable (select or text object)
349 SQL query to be executed.
350 con : SQLAlchemy connectable, str, or sqlite3 connection
351 Using SQLAlchemy makes it possible to use any DB supported by that
352 library. If a DBAPI2 object, only sqlite3 is supported.
353 index_col : str or list of str, optional, default: None
354 Column(s) to set as index(MultiIndex).
355 coerce_float : bool, default True
356 Attempts to convert values of non-string, non-numeric objects (like
357 decimal.Decimal) to floating point. Useful for SQL result sets.
358 params : list, tuple or dict, optional, default: None
359 List of parameters to pass to execute method. The syntax used
360 to pass parameters is database driver dependent. Check your
361 database driver documentation for which of the five syntax styles,
362 described in PEP 249's paramstyle, is supported.
363 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
364 parse_dates : list or dict, default: None
365 - List of column names to parse as dates.
366 - Dict of ``{column_name: format string}`` where format string is
367 strftime compatible in case of parsing string times, or is one of
368 (D, s, ns, ms, us) in case of parsing integer timestamps.
369 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
370 to the keyword arguments of :func:`pandas.to_datetime`
371 Especially useful with databases without native Datetime support,
372 such as SQLite.
373 chunksize : int, default None
374 If specified, return an iterator where `chunksize` is the number of
375 rows to include in each chunk.
376 dtype : Type name or dict of columns
377 Data type for data or columns. E.g. np.float64 or
378 {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}.
380 .. versionadded:: 1.3.0
382 Returns
383 -------
384 DataFrame or Iterator[DataFrame]
386 See Also
387 --------
388 read_sql_table : Read SQL database table into a DataFrame.
389 read_sql : Read SQL query or database table into a DataFrame.
391 Notes
392 -----
393 Any datetime values with time zone information parsed via the `parse_dates`
394 parameter will be converted to UTC.
395 """
396 pandas_sql = pandasSQL_builder(con)
397 return pandas_sql.read_query(
398 sql,
399 index_col=index_col,
400 params=params,
401 coerce_float=coerce_float,
402 parse_dates=parse_dates,
403 chunksize=chunksize,
404 dtype=dtype,
405 )
408@overload
409def read_sql(
410 sql,
411 con,
412 index_col: str | list[str] | None = ...,
413 coerce_float=...,
414 params=...,
415 parse_dates=...,
416 columns: list[str] = ...,
417 chunksize: None = ...,
418) -> DataFrame:
419 ...
422@overload
423def read_sql(
424 sql,
425 con,
426 index_col: str | list[str] | None = ...,
427 coerce_float=...,
428 params=...,
429 parse_dates=...,
430 columns: list[str] = ...,
431 chunksize: int = ...,
432) -> Iterator[DataFrame]:
433 ...
436def read_sql(
437 sql,
438 con,
439 index_col: str | list[str] | None = None,
440 coerce_float: bool = True,
441 params=None,
442 parse_dates=None,
443 columns: list[str] | None = None,
444 chunksize: int | None = None,
445) -> DataFrame | Iterator[DataFrame]:
446 """
447 Read SQL query or database table into a DataFrame.
449 This function is a convenience wrapper around ``read_sql_table`` and
450 ``read_sql_query`` (for backward compatibility). It will delegate
451 to the specific function depending on the provided input. A SQL query
452 will be routed to ``read_sql_query``, while a database table name will
453 be routed to ``read_sql_table``. Note that the delegated function might
454 have more specific notes about their functionality not listed here.
456 Parameters
457 ----------
458 sql : str or SQLAlchemy Selectable (select or text object)
459 SQL query to be executed or a table name.
460 con : SQLAlchemy connectable, str, or sqlite3 connection
461 Using SQLAlchemy makes it possible to use any DB supported by that
462 library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
463 for engine disposal and connection closure for the SQLAlchemy connectable; str
464 connections are closed automatically. See
465 `here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
466 index_col : str or list of str, optional, default: None
467 Column(s) to set as index(MultiIndex).
468 coerce_float : bool, default True
469 Attempts to convert values of non-string, non-numeric objects (like
470 decimal.Decimal) to floating point, useful for SQL result sets.
471 params : list, tuple or dict, optional, default: None
472 List of parameters to pass to execute method. The syntax used
473 to pass parameters is database driver dependent. Check your
474 database driver documentation for which of the five syntax styles,
475 described in PEP 249's paramstyle, is supported.
476 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}.
477 parse_dates : list or dict, default: None
478 - List of column names to parse as dates.
479 - Dict of ``{column_name: format string}`` where format string is
480 strftime compatible in case of parsing string times, or is one of
481 (D, s, ns, ms, us) in case of parsing integer timestamps.
482 - Dict of ``{column_name: arg dict}``, where the arg dict corresponds
483 to the keyword arguments of :func:`pandas.to_datetime`
484 Especially useful with databases without native Datetime support,
485 such as SQLite.
486 columns : list, default: None
487 List of column names to select from SQL table (only used when reading
488 a table).
489 chunksize : int, default None
490 If specified, return an iterator where `chunksize` is the
491 number of rows to include in each chunk.
493 Returns
494 -------
495 DataFrame or Iterator[DataFrame]
497 See Also
498 --------
499 read_sql_table : Read SQL database table into a DataFrame.
500 read_sql_query : Read SQL query into a DataFrame.
502 Examples
503 --------
504 Read data from SQL via either a SQL query or a SQL tablename.
505 When using a SQLite database only SQL queries are accepted,
506 providing only the SQL tablename will result in an error.
508 >>> from sqlite3 import connect
509 >>> conn = connect(':memory:')
510 >>> df = pd.DataFrame(data=[[0, '10/11/12'], [1, '12/11/10']],
511 ... columns=['int_column', 'date_column'])
512 >>> df.to_sql('test_data', conn)
513 2
515 >>> pd.read_sql('SELECT int_column, date_column FROM test_data', conn)
516 int_column date_column
517 0 0 10/11/12
518 1 1 12/11/10
520 >>> pd.read_sql('test_data', 'postgres:///db_name') # doctest:+SKIP
522 Apply date parsing to columns through the ``parse_dates`` argument
524 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
525 ... conn,
526 ... parse_dates=["date_column"])
527 int_column date_column
528 0 0 2012-10-11
529 1 1 2010-12-11
531 The ``parse_dates`` argument calls ``pd.to_datetime`` on the provided columns.
532 Custom argument values for applying ``pd.to_datetime`` on a column are specified
533 via a dictionary format:
534 1. Ignore errors while parsing the values of "date_column"
536 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
537 ... conn,
538 ... parse_dates={"date_column": {"errors": "ignore"}})
539 int_column date_column
540 0 0 2012-10-11
541 1 1 2010-12-11
543 2. Apply a dayfirst date parsing order on the values of "date_column"
545 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
546 ... conn,
547 ... parse_dates={"date_column": {"dayfirst": True}})
548 int_column date_column
549 0 0 2012-11-10
550 1 1 2010-11-12
552 3. Apply custom formatting when date parsing the values of "date_column"
554 >>> pd.read_sql('SELECT int_column, date_column FROM test_data',
555 ... conn,
556 ... parse_dates={"date_column": {"format": "%d/%m/%y"}})
557 int_column date_column
558 0 0 2012-11-10
559 1 1 2010-11-12
560 """
561 pandas_sql = pandasSQL_builder(con)
563 if isinstance(pandas_sql, SQLiteDatabase):
564 return pandas_sql.read_query(
565 sql,
566 index_col=index_col,
567 params=params,
568 coerce_float=coerce_float,
569 parse_dates=parse_dates,
570 chunksize=chunksize,
571 )
573 try:
574 _is_table_name = pandas_sql.has_table(sql)
575 except Exception:
576 # using generic exception to catch errors from sql drivers (GH24988)
577 _is_table_name = False
579 if _is_table_name:
580 pandas_sql.meta.reflect(bind=pandas_sql.connectable, only=[sql])
581 return pandas_sql.read_table(
582 sql,
583 index_col=index_col,
584 coerce_float=coerce_float,
585 parse_dates=parse_dates,
586 columns=columns,
587 chunksize=chunksize,
588 )
589 else:
590 return pandas_sql.read_query(
591 sql,
592 index_col=index_col,
593 params=params,
594 coerce_float=coerce_float,
595 parse_dates=parse_dates,
596 chunksize=chunksize,
597 )
600def to_sql(
601 frame,
602 name: str,
603 con,
604 schema: str | None = None,
605 if_exists: str = "fail",
606 index: bool = True,
607 index_label: IndexLabel = None,
608 chunksize: int | None = None,
609 dtype: DtypeArg | None = None,
610 method: str | None = None,
611 engine: str = "auto",
612 **engine_kwargs,
613) -> int | None:
614 """
615 Write records stored in a DataFrame to a SQL database.
617 Parameters
618 ----------
619 frame : DataFrame, Series
620 name : str
621 Name of SQL table.
622 con : SQLAlchemy connectable(engine/connection) or database string URI
623 or sqlite3 DBAPI2 connection
624 Using SQLAlchemy makes it possible to use any DB supported by that
625 library.
626 If a DBAPI2 object, only sqlite3 is supported.
627 schema : str, optional
628 Name of SQL schema in database to write to (if database flavor
629 supports this). If None, use default schema (default).
630 if_exists : {'fail', 'replace', 'append'}, default 'fail'
631 - fail: If table exists, do nothing.
632 - replace: If table exists, drop it, recreate it, and insert data.
633 - append: If table exists, insert data. Create if does not exist.
634 index : bool, default True
635 Write DataFrame index as a column.
636 index_label : str or sequence, optional
637 Column label for index column(s). If None is given (default) and
638 `index` is True, then the index names are used.
639 A sequence should be given if the DataFrame uses MultiIndex.
640 chunksize : int, optional
641 Specify the number of rows in each batch to be written at a time.
642 By default, all rows will be written at once.
643 dtype : dict or scalar, optional
644 Specifying the datatype for columns. If a dictionary is used, the
645 keys should be the column names and the values should be the
646 SQLAlchemy types or strings for the sqlite3 fallback mode. If a
647 scalar is provided, it will be applied to all columns.
648 method : {None, 'multi', callable}, optional
649 Controls the SQL insertion clause used:
651 - None : Uses standard SQL ``INSERT`` clause (one per row).
652 - ``'multi'``: Pass multiple values in a single ``INSERT`` clause.
653 - callable with signature ``(pd_table, conn, keys, data_iter) -> int | None``.
655 Details and a sample callable implementation can be found in the
656 section :ref:`insert method <io.sql.method>`.
657 engine : {'auto', 'sqlalchemy'}, default 'auto'
658 SQL engine library to use. If 'auto', then the option
659 ``io.sql.engine`` is used. The default ``io.sql.engine``
660 behavior is 'sqlalchemy'
662 .. versionadded:: 1.3.0
664 **engine_kwargs
665 Any additional kwargs are passed to the engine.
667 Returns
668 -------
669 None or int
670 Number of rows affected by to_sql. None is returned if the callable
671 passed into ``method`` does not return an integer number of rows.
673 .. versionadded:: 1.4.0
675 Notes
676 -----
677 The returned rows affected is the sum of the ``rowcount`` attribute of ``sqlite3.Cursor``
678 or SQLAlchemy connectable. The returned value may not reflect the exact number of written
679 rows as stipulated in the
680 `sqlite3 <https://docs.python.org/3/library/sqlite3.html#sqlite3.Cursor.rowcount>`__ or
681 `SQLAlchemy <https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.BaseCursorResult.rowcount>`__
682 """ # noqa:E501
683 if if_exists not in ("fail", "replace", "append"):
684 raise ValueError(f"'{if_exists}' is not valid for if_exists")
686 pandas_sql = pandasSQL_builder(con, schema=schema)
688 if isinstance(frame, Series):
689 frame = frame.to_frame()
690 elif not isinstance(frame, DataFrame):
691 raise NotImplementedError(
692 "'frame' argument should be either a Series or a DataFrame"
693 )
695 return pandas_sql.to_sql(
696 frame,
697 name,
698 if_exists=if_exists,
699 index=index,
700 index_label=index_label,
701 schema=schema,
702 chunksize=chunksize,
703 dtype=dtype,
704 method=method,
705 engine=engine,
706 **engine_kwargs,
707 )
710def has_table(table_name: str, con, schema: str | None = None) -> bool:
711 """
712 Check if DataBase has named table.
714 Parameters
715 ----------
716 table_name: string
717 Name of SQL table.
718 con: SQLAlchemy connectable(engine/connection) or sqlite3 DBAPI2 connection
719 Using SQLAlchemy makes it possible to use any DB supported by that
720 library.
721 If a DBAPI2 object, only sqlite3 is supported.
722 schema : string, default None
723 Name of SQL schema in database to write to (if database flavor supports
724 this). If None, use default schema (default).
726 Returns
727 -------
728 boolean
729 """
730 pandas_sql = pandasSQL_builder(con, schema=schema)
731 return pandas_sql.has_table(table_name)
734table_exists = has_table
737def pandasSQL_builder(con, schema: str | None = None) -> SQLDatabase | SQLiteDatabase:
738 """
739 Convenience function to return the correct PandasSQL subclass based on the
740 provided parameters.
741 """
742 import sqlite3
743 import warnings
745 if isinstance(con, sqlite3.Connection) or con is None:
746 return SQLiteDatabase(con)
748 sqlalchemy = import_optional_dependency("sqlalchemy", errors="ignore")
750 if isinstance(con, str):
751 if sqlalchemy is None:
752 raise ImportError("Using URI string without sqlalchemy installed.")
753 else:
754 con = sqlalchemy.create_engine(con)
756 if sqlalchemy is not None and isinstance(con, sqlalchemy.engine.Connectable):
757 return SQLDatabase(con, schema=schema)
759 warnings.warn(
760 "pandas only supports SQLAlchemy connectable (engine/connection) or "
761 "database string URI or sqlite3 DBAPI2 connection. "
762 "Other DBAPI2 objects are not tested. Please consider using SQLAlchemy.",
763 UserWarning,
764 stacklevel=find_stack_level(),
765 )
766 return SQLiteDatabase(con)
769class SQLTable(PandasObject):
770 """
771 For mapping Pandas tables to SQL tables.
772 Uses fact that table is reflected by SQLAlchemy to
773 do better type conversions.
774 Also holds various flags needed to avoid having to
775 pass them between functions all the time.
776 """
778 # TODO: support for multiIndex
780 def __init__(
781 self,
782 name: str,
783 pandas_sql_engine,
784 frame=None,
785 index: bool | str | list[str] | None = True,
786 if_exists: str = "fail",
787 prefix: str = "pandas",
788 index_label=None,
789 schema=None,
790 keys=None,
791 dtype: DtypeArg | None = None,
792 ) -> None:
793 self.name = name
794 self.pd_sql = pandas_sql_engine
795 self.prefix = prefix
796 self.frame = frame
797 self.index = self._index_name(index, index_label)
798 self.schema = schema
799 self.if_exists = if_exists
800 self.keys = keys
801 self.dtype = dtype
803 if frame is not None:
804 # We want to initialize based on a dataframe
805 self.table = self._create_table_setup()
806 else:
807 # no data provided, read-only mode
808 self.table = self.pd_sql.get_table(self.name, self.schema)
810 if self.table is None:
811 raise ValueError(f"Could not init table '{name}'")
813 def exists(self):
814 return self.pd_sql.has_table(self.name, self.schema)
816 def sql_schema(self) -> str:
817 from sqlalchemy.schema import CreateTable
819 return str(CreateTable(self.table).compile(self.pd_sql.connectable))
821 def _execute_create(self):
822 # Inserting table into database, add to MetaData object
823 self.table = self.table.to_metadata(self.pd_sql.meta)
824 self.table.create(bind=self.pd_sql.connectable)
826 def create(self) -> None:
827 if self.exists():
828 if self.if_exists == "fail":
829 raise ValueError(f"Table '{self.name}' already exists.")
830 elif self.if_exists == "replace":
831 self.pd_sql.drop_table(self.name, self.schema)
832 self._execute_create()
833 elif self.if_exists == "append":
834 pass
835 else:
836 raise ValueError(f"'{self.if_exists}' is not valid for if_exists")
837 else:
838 self._execute_create()
840 def _execute_insert(self, conn, keys: list[str], data_iter) -> int:
841 """
842 Execute SQL statement inserting data
844 Parameters
845 ----------
846 conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
847 keys : list of str
848 Column names
849 data_iter : generator of list
850 Each item contains a list of values to be inserted
851 """
852 data = [dict(zip(keys, row)) for row in data_iter]
853 result = conn.execute(self.table.insert(), data)
854 return result.rowcount
856 def _execute_insert_multi(self, conn, keys: list[str], data_iter) -> int:
857 """
858 Alternative to _execute_insert for DBs support multivalue INSERT.
860 Note: multi-value insert is usually faster for analytics DBs
861 and tables containing a few columns
862 but performance degrades quickly with increase of columns.
863 """
865 from sqlalchemy import insert
867 data = [dict(zip(keys, row)) for row in data_iter]
868 stmt = insert(self.table).values(data)
869 result = conn.execute(stmt)
870 return result.rowcount
872 def insert_data(self) -> tuple[list[str], list[np.ndarray]]:
873 if self.index is not None:
874 temp = self.frame.copy()
875 temp.index.names = self.index
876 try:
877 temp.reset_index(inplace=True)
878 except ValueError as err:
879 raise ValueError(f"duplicate name in index/columns: {err}") from err
880 else:
881 temp = self.frame
883 column_names = list(map(str, temp.columns))
884 ncols = len(column_names)
885 # this just pre-allocates the list: None's will be replaced with ndarrays
886 # error: List item 0 has incompatible type "None"; expected "ndarray"
887 data_list: list[np.ndarray] = [None] * ncols # type: ignore[list-item]
889 for i, (_, ser) in enumerate(temp.items()):
890 vals = ser._values
891 if vals.dtype.kind == "M":
892 d = vals.to_pydatetime()
893 elif vals.dtype.kind == "m":
894 # store as integers, see GH#6921, GH#7076
895 d = vals.view("i8").astype(object)
896 else:
897 d = vals.astype(object)
899 assert isinstance(d, np.ndarray), type(d)
901 if ser._can_hold_na:
902 # Note: this will miss timedeltas since they are converted to int
903 mask = isna(d)
904 d[mask] = None
906 data_list[i] = d
908 return column_names, data_list
910 def insert(
911 self, chunksize: int | None = None, method: str | None = None
912 ) -> int | None:
914 # set insert method
915 if method is None:
916 exec_insert = self._execute_insert
917 elif method == "multi":
918 exec_insert = self._execute_insert_multi
919 elif callable(method):
920 exec_insert = partial(method, self)
921 else:
922 raise ValueError(f"Invalid parameter `method`: {method}")
924 keys, data_list = self.insert_data()
926 nrows = len(self.frame)
928 if nrows == 0:
929 return 0
931 if chunksize is None:
932 chunksize = nrows
933 elif chunksize == 0:
934 raise ValueError("chunksize argument should be non-zero")
936 chunks = (nrows // chunksize) + 1
937 total_inserted = None
938 with self.pd_sql.run_transaction() as conn:
939 for i in range(chunks):
940 start_i = i * chunksize
941 end_i = min((i + 1) * chunksize, nrows)
942 if start_i >= end_i:
943 break
945 chunk_iter = zip(*(arr[start_i:end_i] for arr in data_list))
946 num_inserted = exec_insert(conn, keys, chunk_iter)
947 # GH 46891
948 if is_integer(num_inserted):
949 if total_inserted is None:
950 total_inserted = num_inserted
951 else:
952 total_inserted += num_inserted
953 return total_inserted
955 def _query_iterator(
956 self,
957 result,
958 chunksize: str | None,
959 columns,
960 coerce_float: bool = True,
961 parse_dates=None,
962 ):
963 """Return generator through chunked result set."""
964 has_read_data = False
965 while True:
966 data = result.fetchmany(chunksize)
967 if not data:
968 if not has_read_data:
969 yield DataFrame.from_records(
970 [], columns=columns, coerce_float=coerce_float
971 )
972 break
973 else:
974 has_read_data = True
975 self.frame = DataFrame.from_records(
976 data, columns=columns, coerce_float=coerce_float
977 )
979 self._harmonize_columns(parse_dates=parse_dates)
981 if self.index is not None:
982 self.frame.set_index(self.index, inplace=True)
984 yield self.frame
986 def read(
987 self,
988 coerce_float: bool = True,
989 parse_dates=None,
990 columns=None,
991 chunksize=None,
992 ) -> DataFrame | Iterator[DataFrame]:
993 from sqlalchemy import select
995 if columns is not None and len(columns) > 0:
996 cols = [self.table.c[n] for n in columns]
997 if self.index is not None:
998 for idx in self.index[::-1]:
999 cols.insert(0, self.table.c[idx])
1000 sql_select = select(*cols)
1001 else:
1002 sql_select = select(self.table)
1003 result = self.pd_sql.execute(sql_select)
1004 column_names = result.keys()
1006 if chunksize is not None:
1007 return self._query_iterator(
1008 result,
1009 chunksize,
1010 column_names,
1011 coerce_float=coerce_float,
1012 parse_dates=parse_dates,
1013 )
1014 else:
1015 data = result.fetchall()
1016 self.frame = DataFrame.from_records(
1017 data, columns=column_names, coerce_float=coerce_float
1018 )
1020 self._harmonize_columns(parse_dates=parse_dates)
1022 if self.index is not None:
1023 self.frame.set_index(self.index, inplace=True)
1025 return self.frame
1027 def _index_name(self, index, index_label):
1028 # for writing: index=True to include index in sql table
1029 if index is True:
1030 nlevels = self.frame.index.nlevels
1031 # if index_label is specified, set this as index name(s)
1032 if index_label is not None:
1033 if not isinstance(index_label, list):
1034 index_label = [index_label]
1035 if len(index_label) != nlevels:
1036 raise ValueError(
1037 "Length of 'index_label' should match number of "
1038 f"levels, which is {nlevels}"
1039 )
1040 else:
1041 return index_label
1042 # return the used column labels for the index columns
1043 if (
1044 nlevels == 1
1045 and "index" not in self.frame.columns
1046 and self.frame.index.name is None
1047 ):
1048 return ["index"]
1049 else:
1050 return com.fill_missing_names(self.frame.index.names)
1052 # for reading: index=(list of) string to specify column to set as index
1053 elif isinstance(index, str):
1054 return [index]
1055 elif isinstance(index, list):
1056 return index
1057 else:
1058 return None
1060 def _get_column_names_and_types(self, dtype_mapper):
1061 column_names_and_types = []
1062 if self.index is not None:
1063 for i, idx_label in enumerate(self.index):
1064 idx_type = dtype_mapper(self.frame.index._get_level_values(i))
1065 column_names_and_types.append((str(idx_label), idx_type, True))
1067 column_names_and_types += [
1068 (str(self.frame.columns[i]), dtype_mapper(self.frame.iloc[:, i]), False)
1069 for i in range(len(self.frame.columns))
1070 ]
1072 return column_names_and_types
1074 def _create_table_setup(self):
1075 from sqlalchemy import (
1076 Column,
1077 PrimaryKeyConstraint,
1078 Table,
1079 )
1080 from sqlalchemy.schema import MetaData
1082 column_names_and_types = self._get_column_names_and_types(self._sqlalchemy_type)
1084 columns = [
1085 Column(name, typ, index=is_index)
1086 for name, typ, is_index in column_names_and_types
1087 ]
1089 if self.keys is not None:
1090 if not is_list_like(self.keys):
1091 keys = [self.keys]
1092 else:
1093 keys = self.keys
1094 pkc = PrimaryKeyConstraint(*keys, name=self.name + "_pk")
1095 columns.append(pkc)
1097 schema = self.schema or self.pd_sql.meta.schema
1099 # At this point, attach to new metadata, only attach to self.meta
1100 # once table is created.
1101 meta = MetaData()
1102 return Table(self.name, meta, *columns, schema=schema)
1104 def _harmonize_columns(self, parse_dates=None):
1105 """
1106 Make the DataFrame's column types align with the SQL table
1107 column types.
1108 Need to work around limited NA value support. Floats are always
1109 fine, ints must always be floats if there are Null values.
1110 Booleans are hard because converting bool column with None replaces
1111 all Nones with false. Therefore only convert bool if there are no
1112 NA values.
1113 Datetimes should already be converted to np.datetime64 if supported,
1114 but here we also force conversion if required.
1115 """
1116 parse_dates = _process_parse_dates_argument(parse_dates)
1118 for sql_col in self.table.columns:
1119 col_name = sql_col.name
1120 try:
1121 df_col = self.frame[col_name]
1123 # Handle date parsing upfront; don't try to convert columns
1124 # twice
1125 if col_name in parse_dates:
1126 try:
1127 fmt = parse_dates[col_name]
1128 except TypeError:
1129 fmt = None
1130 self.frame[col_name] = _handle_date_column(df_col, format=fmt)
1131 continue
1133 # the type the dataframe column should have
1134 col_type = self._get_dtype(sql_col.type)
1136 if (
1137 col_type is datetime
1138 or col_type is date
1139 or col_type is DatetimeTZDtype
1140 ):
1141 # Convert tz-aware Datetime SQL columns to UTC
1142 utc = col_type is DatetimeTZDtype
1143 self.frame[col_name] = _handle_date_column(df_col, utc=utc)
1144 elif col_type is float:
1145 # floats support NA, can always convert!
1146 self.frame[col_name] = df_col.astype(col_type, copy=False)
1148 elif len(df_col) == df_col.count():
1149 # No NA values, can convert ints and bools
1150 if col_type is np.dtype("int64") or col_type is bool:
1151 self.frame[col_name] = df_col.astype(col_type, copy=False)
1152 except KeyError:
1153 pass # this column not in results
1155 def _sqlalchemy_type(self, col):
1157 dtype: DtypeArg = self.dtype or {}
1158 if is_dict_like(dtype):
1159 dtype = cast(dict, dtype)
1160 if col.name in dtype:
1161 return dtype[col.name]
1163 # Infer type of column, while ignoring missing values.
1164 # Needed for inserting typed data containing NULLs, GH 8778.
1165 col_type = lib.infer_dtype(col, skipna=True)
1167 from sqlalchemy.types import (
1168 TIMESTAMP,
1169 BigInteger,
1170 Boolean,
1171 Date,
1172 DateTime,
1173 Float,
1174 Integer,
1175 SmallInteger,
1176 Text,
1177 Time,
1178 )
1180 if col_type == "datetime64" or col_type == "datetime":
1181 # GH 9086: TIMESTAMP is the suggested type if the column contains
1182 # timezone information
1183 try:
1184 if col.dt.tz is not None:
1185 return TIMESTAMP(timezone=True)
1186 except AttributeError:
1187 # The column is actually a DatetimeIndex
1188 # GH 26761 or an Index with date-like data e.g. 9999-01-01
1189 if getattr(col, "tz", None) is not None:
1190 return TIMESTAMP(timezone=True)
1191 return DateTime
1192 if col_type == "timedelta64":
1193 warnings.warn(
1194 "the 'timedelta' type is not supported, and will be "
1195 "written as integer values (ns frequency) to the database.",
1196 UserWarning,
1197 stacklevel=find_stack_level(),
1198 )
1199 return BigInteger
1200 elif col_type == "floating":
1201 if col.dtype == "float32":
1202 return Float(precision=23)
1203 else:
1204 return Float(precision=53)
1205 elif col_type == "integer":
1206 # GH35076 Map pandas integer to optimal SQLAlchemy integer type
1207 if col.dtype.name.lower() in ("int8", "uint8", "int16"):
1208 return SmallInteger
1209 elif col.dtype.name.lower() in ("uint16", "int32"):
1210 return Integer
1211 elif col.dtype.name.lower() == "uint64":
1212 raise ValueError("Unsigned 64 bit integer datatype is not supported")
1213 else:
1214 return BigInteger
1215 elif col_type == "boolean":
1216 return Boolean
1217 elif col_type == "date":
1218 return Date
1219 elif col_type == "time":
1220 return Time
1221 elif col_type == "complex":
1222 raise ValueError("Complex datatypes not supported")
1224 return Text
1226 def _get_dtype(self, sqltype):
1227 from sqlalchemy.types import (
1228 TIMESTAMP,
1229 Boolean,
1230 Date,
1231 DateTime,
1232 Float,
1233 Integer,
1234 )
1236 if isinstance(sqltype, Float):
1237 return float
1238 elif isinstance(sqltype, Integer):
1239 # TODO: Refine integer size.
1240 return np.dtype("int64")
1241 elif isinstance(sqltype, TIMESTAMP):
1242 # we have a timezone capable type
1243 if not sqltype.timezone:
1244 return datetime
1245 return DatetimeTZDtype
1246 elif isinstance(sqltype, DateTime):
1247 # Caution: np.datetime64 is also a subclass of np.number.
1248 return datetime
1249 elif isinstance(sqltype, Date):
1250 return date
1251 elif isinstance(sqltype, Boolean):
1252 return bool
1253 return object
1256class PandasSQL(PandasObject):
1257 """
1258 Subclasses Should define read_sql and to_sql.
1259 """
1261 def read_sql(self, *args, **kwargs):
1262 raise ValueError(
1263 "PandasSQL must be created with an SQLAlchemy "
1264 "connectable or sqlite connection"
1265 )
1267 def to_sql(
1268 self,
1269 frame,
1270 name,
1271 if_exists: str = "fail",
1272 index: bool = True,
1273 index_label=None,
1274 schema=None,
1275 chunksize=None,
1276 dtype: DtypeArg | None = None,
1277 method=None,
1278 ) -> int | None:
1279 raise ValueError(
1280 "PandasSQL must be created with an SQLAlchemy "
1281 "connectable or sqlite connection"
1282 )
1285class BaseEngine:
1286 def insert_records(
1287 self,
1288 table: SQLTable,
1289 con,
1290 frame,
1291 name,
1292 index=True,
1293 schema=None,
1294 chunksize=None,
1295 method=None,
1296 **engine_kwargs,
1297 ) -> int | None:
1298 """
1299 Inserts data into already-prepared table
1300 """
1301 raise AbstractMethodError(self)
1304class SQLAlchemyEngine(BaseEngine):
1305 def __init__(self) -> None:
1306 import_optional_dependency(
1307 "sqlalchemy", extra="sqlalchemy is required for SQL support."
1308 )
1310 def insert_records(
1311 self,
1312 table: SQLTable,
1313 con,
1314 frame,
1315 name,
1316 index=True,
1317 schema=None,
1318 chunksize=None,
1319 method=None,
1320 **engine_kwargs,
1321 ) -> int | None:
1322 from sqlalchemy import exc
1324 try:
1325 return table.insert(chunksize=chunksize, method=method)
1326 except exc.SQLAlchemyError as err:
1327 # GH34431
1328 # https://stackoverflow.com/a/67358288/6067848
1329 msg = r"""(\(1054, "Unknown column 'inf(e0)?' in 'field list'"\))(?#
1330 )|inf can not be used with MySQL"""
1331 err_text = str(err.orig)
1332 if re.search(msg, err_text):
1333 raise ValueError("inf cannot be used with MySQL") from err
1334 else:
1335 raise err
1338def get_engine(engine: str) -> BaseEngine:
1339 """return our implementation"""
1340 if engine == "auto":
1341 engine = get_option("io.sql.engine")
1343 if engine == "auto":
1344 # try engines in this order
1345 engine_classes = [SQLAlchemyEngine]
1347 error_msgs = ""
1348 for engine_class in engine_classes:
1349 try:
1350 return engine_class()
1351 except ImportError as err:
1352 error_msgs += "\n - " + str(err)
1354 raise ImportError(
1355 "Unable to find a usable engine; "
1356 "tried using: 'sqlalchemy'.\n"
1357 "A suitable version of "
1358 "sqlalchemy is required for sql I/O "
1359 "support.\n"
1360 "Trying to import the above resulted in these errors:"
1361 f"{error_msgs}"
1362 )
1364 elif engine == "sqlalchemy":
1365 return SQLAlchemyEngine()
1367 raise ValueError("engine must be one of 'auto', 'sqlalchemy'")
1370class SQLDatabase(PandasSQL):
1371 """
1372 This class enables conversion between DataFrame and SQL databases
1373 using SQLAlchemy to handle DataBase abstraction.
1375 Parameters
1376 ----------
1377 engine : SQLAlchemy connectable
1378 Connectable to connect with the database. Using SQLAlchemy makes it
1379 possible to use any DB supported by that library.
1380 schema : string, default None
1381 Name of SQL schema in database to write to (if database flavor
1382 supports this). If None, use default schema (default).
1384 """
1386 def __init__(self, engine, schema: str | None = None) -> None:
1387 from sqlalchemy.schema import MetaData
1389 self.connectable = engine
1390 self.meta = MetaData(schema=schema)
1392 @contextmanager
1393 def run_transaction(self):
1394 from sqlalchemy.engine import Engine
1396 if isinstance(self.connectable, Engine):
1397 with self.connectable.connect() as conn:
1398 with conn.begin():
1399 yield conn
1400 else:
1401 yield self.connectable
1403 def execute(self, *args, **kwargs):
1404 """Simple passthrough to SQLAlchemy connectable"""
1405 return self.connectable.execution_options().execute(*args, **kwargs)
1407 def read_table(
1408 self,
1409 table_name: str,
1410 index_col: str | list[str] | None = None,
1411 coerce_float: bool = True,
1412 parse_dates=None,
1413 columns=None,
1414 schema: str | None = None,
1415 chunksize: int | None = None,
1416 ) -> DataFrame | Iterator[DataFrame]:
1417 """
1418 Read SQL database table into a DataFrame.
1420 Parameters
1421 ----------
1422 table_name : str
1423 Name of SQL table in database.
1424 index_col : string, optional, default: None
1425 Column to set as index.
1426 coerce_float : bool, default True
1427 Attempts to convert values of non-string, non-numeric objects
1428 (like decimal.Decimal) to floating point. This can result in
1429 loss of precision.
1430 parse_dates : list or dict, default: None
1431 - List of column names to parse as dates.
1432 - Dict of ``{column_name: format string}`` where format string is
1433 strftime compatible in case of parsing string times, or is one of
1434 (D, s, ns, ms, us) in case of parsing integer timestamps.
1435 - Dict of ``{column_name: arg}``, where the arg corresponds
1436 to the keyword arguments of :func:`pandas.to_datetime`.
1437 Especially useful with databases without native Datetime support,
1438 such as SQLite.
1439 columns : list, default: None
1440 List of column names to select from SQL table.
1441 schema : string, default None
1442 Name of SQL schema in database to query (if database flavor
1443 supports this). If specified, this overwrites the default
1444 schema of the SQL database object.
1445 chunksize : int, default None
1446 If specified, return an iterator where `chunksize` is the number
1447 of rows to include in each chunk.
1449 Returns
1450 -------
1451 DataFrame
1453 See Also
1454 --------
1455 pandas.read_sql_table
1456 SQLDatabase.read_query
1458 """
1459 table = SQLTable(table_name, self, index=index_col, schema=schema)
1460 return table.read(
1461 coerce_float=coerce_float,
1462 parse_dates=parse_dates,
1463 columns=columns,
1464 chunksize=chunksize,
1465 )
1467 @staticmethod
1468 def _query_iterator(
1469 result,
1470 chunksize: int,
1471 columns,
1472 index_col=None,
1473 coerce_float=True,
1474 parse_dates=None,
1475 dtype: DtypeArg | None = None,
1476 ):
1477 """Return generator through chunked result set"""
1478 has_read_data = False
1479 while True:
1480 data = result.fetchmany(chunksize)
1481 if not data:
1482 if not has_read_data:
1483 yield _wrap_result(
1484 [],
1485 columns,
1486 index_col=index_col,
1487 coerce_float=coerce_float,
1488 parse_dates=parse_dates,
1489 )
1490 break
1491 else:
1492 has_read_data = True
1493 yield _wrap_result(
1494 data,
1495 columns,
1496 index_col=index_col,
1497 coerce_float=coerce_float,
1498 parse_dates=parse_dates,
1499 dtype=dtype,
1500 )
1502 def read_query(
1503 self,
1504 sql: str,
1505 index_col: str | list[str] | None = None,
1506 coerce_float: bool = True,
1507 parse_dates=None,
1508 params=None,
1509 chunksize: int | None = None,
1510 dtype: DtypeArg | None = None,
1511 ) -> DataFrame | Iterator[DataFrame]:
1512 """
1513 Read SQL query into a DataFrame.
1515 Parameters
1516 ----------
1517 sql : str
1518 SQL query to be executed.
1519 index_col : string, optional, default: None
1520 Column name to use as index for the returned DataFrame object.
1521 coerce_float : bool, default True
1522 Attempt to convert values of non-string, non-numeric objects (like
1523 decimal.Decimal) to floating point, useful for SQL result sets.
1524 params : list, tuple or dict, optional, default: None
1525 List of parameters to pass to execute method. The syntax used
1526 to pass parameters is database driver dependent. Check your
1527 database driver documentation for which of the five syntax styles,
1528 described in PEP 249's paramstyle, is supported.
1529 Eg. for psycopg2, uses %(name)s so use params={'name' : 'value'}
1530 parse_dates : list or dict, default: None
1531 - List of column names to parse as dates.
1532 - Dict of ``{column_name: format string}`` where format string is
1533 strftime compatible in case of parsing string times, or is one of
1534 (D, s, ns, ms, us) in case of parsing integer timestamps.
1535 - Dict of ``{column_name: arg dict}``, where the arg dict
1536 corresponds to the keyword arguments of
1537 :func:`pandas.to_datetime` Especially useful with databases
1538 without native Datetime support, such as SQLite.
1539 chunksize : int, default None
1540 If specified, return an iterator where `chunksize` is the number
1541 of rows to include in each chunk.
1542 dtype : Type name or dict of columns
1543 Data type for data or columns. E.g. np.float64 or
1544 {‘a’: np.float64, ‘b’: np.int32, ‘c’: ‘Int64’}
1546 .. versionadded:: 1.3.0
1548 Returns
1549 -------
1550 DataFrame
1552 See Also
1553 --------
1554 read_sql_table : Read SQL database table into a DataFrame.
1555 read_sql
1557 """
1558 args = _convert_params(sql, params)
1560 result = self.execute(*args)
1561 columns = result.keys()
1563 if chunksize is not None:
1564 return self._query_iterator(
1565 result,
1566 chunksize,
1567 columns,
1568 index_col=index_col,
1569 coerce_float=coerce_float,
1570 parse_dates=parse_dates,
1571 dtype=dtype,
1572 )
1573 else:
1574 data = result.fetchall()
1575 frame = _wrap_result(
1576 data,
1577 columns,
1578 index_col=index_col,
1579 coerce_float=coerce_float,
1580 parse_dates=parse_dates,
1581 dtype=dtype,
1582 )
1583 return frame
1585 read_sql = read_query
1587 def prep_table(
1588 self,
1589 frame,
1590 name,
1591 if_exists="fail",
1592 index=True,
1593 index_label=None,
1594 schema=None,
1595 dtype: DtypeArg | None = None,
1596 ) -> SQLTable:
1597 """
1598 Prepares table in the database for data insertion. Creates it if needed, etc.
1599 """
1600 if dtype:
1601 if not is_dict_like(dtype):
1602 # error: Value expression in dictionary comprehension has incompatible
1603 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
1604 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
1605 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
1606 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
1607 # dtype[Any], Type[object]]"
1608 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
1609 else:
1610 dtype = cast(dict, dtype)
1612 from sqlalchemy.types import (
1613 TypeEngine,
1614 to_instance,
1615 )
1617 for col, my_type in dtype.items():
1618 if not isinstance(to_instance(my_type), TypeEngine):
1619 raise ValueError(f"The type of {col} is not a SQLAlchemy type")
1621 table = SQLTable(
1622 name,
1623 self,
1624 frame=frame,
1625 index=index,
1626 if_exists=if_exists,
1627 index_label=index_label,
1628 schema=schema,
1629 dtype=dtype,
1630 )
1631 table.create()
1632 return table
1634 def check_case_sensitive(
1635 self,
1636 name: str,
1637 schema: str | None,
1638 ) -> None:
1639 """
1640 Checks table name for issues with case-sensitivity.
1641 Method is called after data is inserted.
1642 """
1643 if not name.isdigit() and not name.islower():
1644 # check for potentially case sensitivity issues (GH7815)
1645 # Only check when name is not a number and name is not lower case
1646 from sqlalchemy import inspect as sqlalchemy_inspect
1648 with self.connectable.connect() as conn:
1649 insp = sqlalchemy_inspect(conn)
1650 table_names = insp.get_table_names(schema=schema or self.meta.schema)
1651 if name not in table_names:
1652 msg = (
1653 f"The provided table name '{name}' is not found exactly as "
1654 "such in the database after writing the table, possibly "
1655 "due to case sensitivity issues. Consider using lower "
1656 "case table names."
1657 )
1658 warnings.warn(
1659 msg,
1660 UserWarning,
1661 stacklevel=find_stack_level(),
1662 )
1664 def to_sql(
1665 self,
1666 frame,
1667 name: str,
1668 if_exists: str = "fail",
1669 index: bool = True,
1670 index_label=None,
1671 schema: str | None = None,
1672 chunksize=None,
1673 dtype: DtypeArg | None = None,
1674 method=None,
1675 engine="auto",
1676 **engine_kwargs,
1677 ) -> int | None:
1678 """
1679 Write records stored in a DataFrame to a SQL database.
1681 Parameters
1682 ----------
1683 frame : DataFrame
1684 name : string
1685 Name of SQL table.
1686 if_exists : {'fail', 'replace', 'append'}, default 'fail'
1687 - fail: If table exists, do nothing.
1688 - replace: If table exists, drop it, recreate it, and insert data.
1689 - append: If table exists, insert data. Create if does not exist.
1690 index : boolean, default True
1691 Write DataFrame index as a column.
1692 index_label : string or sequence, default None
1693 Column label for index column(s). If None is given (default) and
1694 `index` is True, then the index names are used.
1695 A sequence should be given if the DataFrame uses MultiIndex.
1696 schema : string, default None
1697 Name of SQL schema in database to write to (if database flavor
1698 supports this). If specified, this overwrites the default
1699 schema of the SQLDatabase object.
1700 chunksize : int, default None
1701 If not None, then rows will be written in batches of this size at a
1702 time. If None, all rows will be written at once.
1703 dtype : single type or dict of column name to SQL type, default None
1704 Optional specifying the datatype for columns. The SQL type should
1705 be a SQLAlchemy type. If all columns are of the same type, one
1706 single value can be used.
1707 method : {None', 'multi', callable}, default None
1708 Controls the SQL insertion clause used:
1710 * None : Uses standard SQL ``INSERT`` clause (one per row).
1711 * 'multi': Pass multiple values in a single ``INSERT`` clause.
1712 * callable with signature ``(pd_table, conn, keys, data_iter)``.
1714 Details and a sample callable implementation can be found in the
1715 section :ref:`insert method <io.sql.method>`.
1716 engine : {'auto', 'sqlalchemy'}, default 'auto'
1717 SQL engine library to use. If 'auto', then the option
1718 ``io.sql.engine`` is used. The default ``io.sql.engine``
1719 behavior is 'sqlalchemy'
1721 .. versionadded:: 1.3.0
1723 **engine_kwargs
1724 Any additional kwargs are passed to the engine.
1725 """
1726 sql_engine = get_engine(engine)
1728 table = self.prep_table(
1729 frame=frame,
1730 name=name,
1731 if_exists=if_exists,
1732 index=index,
1733 index_label=index_label,
1734 schema=schema,
1735 dtype=dtype,
1736 )
1738 total_inserted = sql_engine.insert_records(
1739 table=table,
1740 con=self.connectable,
1741 frame=frame,
1742 name=name,
1743 index=index,
1744 schema=schema,
1745 chunksize=chunksize,
1746 method=method,
1747 **engine_kwargs,
1748 )
1750 self.check_case_sensitive(name=name, schema=schema)
1751 return total_inserted
1753 @property
1754 def tables(self):
1755 return self.meta.tables
1757 def has_table(self, name: str, schema: str | None = None):
1758 from sqlalchemy import inspect as sqlalchemy_inspect
1760 insp = sqlalchemy_inspect(self.connectable)
1761 return insp.has_table(name, schema or self.meta.schema)
1763 def get_table(self, table_name: str, schema: str | None = None) -> Table:
1764 from sqlalchemy import (
1765 Numeric,
1766 Table,
1767 )
1769 schema = schema or self.meta.schema
1770 tbl = Table(
1771 table_name, self.meta, autoload_with=self.connectable, schema=schema
1772 )
1773 for column in tbl.columns:
1774 if isinstance(column.type, Numeric):
1775 column.type.asdecimal = False
1776 return tbl
1778 def drop_table(self, table_name: str, schema: str | None = None) -> None:
1779 schema = schema or self.meta.schema
1780 if self.has_table(table_name, schema):
1781 self.meta.reflect(bind=self.connectable, only=[table_name], schema=schema)
1782 self.get_table(table_name, schema).drop(bind=self.connectable)
1783 self.meta.clear()
1785 def _create_sql_schema(
1786 self,
1787 frame: DataFrame,
1788 table_name: str,
1789 keys: list[str] | None = None,
1790 dtype: DtypeArg | None = None,
1791 schema: str | None = None,
1792 ):
1793 table = SQLTable(
1794 table_name,
1795 self,
1796 frame=frame,
1797 index=False,
1798 keys=keys,
1799 dtype=dtype,
1800 schema=schema,
1801 )
1802 return str(table.sql_schema())
1805# ---- SQL without SQLAlchemy ---
1806# sqlite-specific sql strings and handler class
1807# dictionary used for readability purposes
1808_SQL_TYPES = {
1809 "string": "TEXT",
1810 "floating": "REAL",
1811 "integer": "INTEGER",
1812 "datetime": "TIMESTAMP",
1813 "date": "DATE",
1814 "time": "TIME",
1815 "boolean": "INTEGER",
1816}
1819def _get_unicode_name(name):
1820 try:
1821 uname = str(name).encode("utf-8", "strict").decode("utf-8")
1822 except UnicodeError as err:
1823 raise ValueError(f"Cannot convert identifier to UTF-8: '{name}'") from err
1824 return uname
1827def _get_valid_sqlite_name(name):
1828 # See https://stackoverflow.com/questions/6514274/how-do-you-escape-strings\
1829 # -for-sqlite-table-column-names-in-python
1830 # Ensure the string can be encoded as UTF-8.
1831 # Ensure the string does not include any NUL characters.
1832 # Replace all " with "".
1833 # Wrap the entire thing in double quotes.
1835 uname = _get_unicode_name(name)
1836 if not len(uname):
1837 raise ValueError("Empty table or column name specified")
1839 nul_index = uname.find("\x00")
1840 if nul_index >= 0:
1841 raise ValueError("SQLite identifier cannot contain NULs")
1842 return '"' + uname.replace('"', '""') + '"'
1845class SQLiteTable(SQLTable):
1846 """
1847 Patch the SQLTable for fallback support.
1848 Instead of a table variable just use the Create Table statement.
1849 """
1851 def __init__(self, *args, **kwargs) -> None:
1852 # GH 8341
1853 # register an adapter callable for datetime.time object
1854 import sqlite3
1856 # this will transform time(12,34,56,789) into '12:34:56.000789'
1857 # (this is what sqlalchemy does)
1858 def _adapt_time(t):
1859 # This is faster than strftime
1860 return f"{t.hour:02d}:{t.minute:02d}:{t.second:02d}.{t.microsecond:06d}"
1862 sqlite3.register_adapter(time, _adapt_time)
1863 super().__init__(*args, **kwargs)
1865 def sql_schema(self) -> str:
1866 return str(";\n".join(self.table))
1868 def _execute_create(self):
1869 with self.pd_sql.run_transaction() as conn:
1870 for stmt in self.table:
1871 conn.execute(stmt)
1873 def insert_statement(self, *, num_rows: int) -> str:
1874 names = list(map(str, self.frame.columns))
1875 wld = "?" # wildcard char
1876 escape = _get_valid_sqlite_name
1878 if self.index is not None:
1879 for idx in self.index[::-1]:
1880 names.insert(0, idx)
1882 bracketed_names = [escape(column) for column in names]
1883 col_names = ",".join(bracketed_names)
1885 row_wildcards = ",".join([wld] * len(names))
1886 wildcards = ",".join([f"({row_wildcards})" for _ in range(num_rows)])
1887 insert_statement = (
1888 f"INSERT INTO {escape(self.name)} ({col_names}) VALUES {wildcards}"
1889 )
1890 return insert_statement
1892 def _execute_insert(self, conn, keys, data_iter) -> int:
1893 data_list = list(data_iter)
1894 conn.executemany(self.insert_statement(num_rows=1), data_list)
1895 return conn.rowcount
1897 def _execute_insert_multi(self, conn, keys, data_iter) -> int:
1898 data_list = list(data_iter)
1899 flattened_data = [x for row in data_list for x in row]
1900 conn.execute(self.insert_statement(num_rows=len(data_list)), flattened_data)
1901 return conn.rowcount
1903 def _create_table_setup(self):
1904 """
1905 Return a list of SQL statements that creates a table reflecting the
1906 structure of a DataFrame. The first entry will be a CREATE TABLE
1907 statement while the rest will be CREATE INDEX statements.
1908 """
1909 column_names_and_types = self._get_column_names_and_types(self._sql_type_name)
1910 escape = _get_valid_sqlite_name
1912 create_tbl_stmts = [
1913 escape(cname) + " " + ctype for cname, ctype, _ in column_names_and_types
1914 ]
1916 if self.keys is not None and len(self.keys):
1917 if not is_list_like(self.keys):
1918 keys = [self.keys]
1919 else:
1920 keys = self.keys
1921 cnames_br = ", ".join([escape(c) for c in keys])
1922 create_tbl_stmts.append(
1923 f"CONSTRAINT {self.name}_pk PRIMARY KEY ({cnames_br})"
1924 )
1925 if self.schema:
1926 schema_name = self.schema + "."
1927 else:
1928 schema_name = ""
1929 create_stmts = [
1930 "CREATE TABLE "
1931 + schema_name
1932 + escape(self.name)
1933 + " (\n"
1934 + ",\n ".join(create_tbl_stmts)
1935 + "\n)"
1936 ]
1938 ix_cols = [cname for cname, _, is_index in column_names_and_types if is_index]
1939 if len(ix_cols):
1940 cnames = "_".join(ix_cols)
1941 cnames_br = ",".join([escape(c) for c in ix_cols])
1942 create_stmts.append(
1943 "CREATE INDEX "
1944 + escape("ix_" + self.name + "_" + cnames)
1945 + "ON "
1946 + escape(self.name)
1947 + " ("
1948 + cnames_br
1949 + ")"
1950 )
1952 return create_stmts
1954 def _sql_type_name(self, col):
1955 dtype: DtypeArg = self.dtype or {}
1956 if is_dict_like(dtype):
1957 dtype = cast(dict, dtype)
1958 if col.name in dtype:
1959 return dtype[col.name]
1961 # Infer type of column, while ignoring missing values.
1962 # Needed for inserting typed data containing NULLs, GH 8778.
1963 col_type = lib.infer_dtype(col, skipna=True)
1965 if col_type == "timedelta64":
1966 warnings.warn(
1967 "the 'timedelta' type is not supported, and will be "
1968 "written as integer values (ns frequency) to the database.",
1969 UserWarning,
1970 stacklevel=find_stack_level(),
1971 )
1972 col_type = "integer"
1974 elif col_type == "datetime64":
1975 col_type = "datetime"
1977 elif col_type == "empty":
1978 col_type = "string"
1980 elif col_type == "complex":
1981 raise ValueError("Complex datatypes not supported")
1983 if col_type not in _SQL_TYPES:
1984 col_type = "string"
1986 return _SQL_TYPES[col_type]
1989class SQLiteDatabase(PandasSQL):
1990 """
1991 Version of SQLDatabase to support SQLite connections (fallback without
1992 SQLAlchemy). This should only be used internally.
1994 Parameters
1995 ----------
1996 con : sqlite connection object
1998 """
2000 def __init__(self, con) -> None:
2001 self.con = con
2003 @contextmanager
2004 def run_transaction(self):
2005 cur = self.con.cursor()
2006 try:
2007 yield cur
2008 self.con.commit()
2009 except Exception:
2010 self.con.rollback()
2011 raise
2012 finally:
2013 cur.close()
2015 def execute(self, *args, **kwargs):
2016 cur = self.con.cursor()
2017 try:
2018 cur.execute(*args, **kwargs)
2019 return cur
2020 except Exception as exc:
2021 try:
2022 self.con.rollback()
2023 except Exception as inner_exc: # pragma: no cover
2024 ex = DatabaseError(
2025 f"Execution failed on sql: {args[0]}\n{exc}\nunable to rollback"
2026 )
2027 raise ex from inner_exc
2029 ex = DatabaseError(f"Execution failed on sql '{args[0]}': {exc}")
2030 raise ex from exc
2032 @staticmethod
2033 def _query_iterator(
2034 cursor,
2035 chunksize: int,
2036 columns,
2037 index_col=None,
2038 coerce_float: bool = True,
2039 parse_dates=None,
2040 dtype: DtypeArg | None = None,
2041 ):
2042 """Return generator through chunked result set"""
2043 has_read_data = False
2044 while True:
2045 data = cursor.fetchmany(chunksize)
2046 if type(data) == tuple:
2047 data = list(data)
2048 if not data:
2049 cursor.close()
2050 if not has_read_data:
2051 yield DataFrame.from_records(
2052 [], columns=columns, coerce_float=coerce_float
2053 )
2054 break
2055 else:
2056 has_read_data = True
2057 yield _wrap_result(
2058 data,
2059 columns,
2060 index_col=index_col,
2061 coerce_float=coerce_float,
2062 parse_dates=parse_dates,
2063 dtype=dtype,
2064 )
2066 def read_query(
2067 self,
2068 sql,
2069 index_col=None,
2070 coerce_float: bool = True,
2071 params=None,
2072 parse_dates=None,
2073 chunksize: int | None = None,
2074 dtype: DtypeArg | None = None,
2075 ) -> DataFrame | Iterator[DataFrame]:
2077 args = _convert_params(sql, params)
2078 cursor = self.execute(*args)
2079 columns = [col_desc[0] for col_desc in cursor.description]
2081 if chunksize is not None:
2082 return self._query_iterator(
2083 cursor,
2084 chunksize,
2085 columns,
2086 index_col=index_col,
2087 coerce_float=coerce_float,
2088 parse_dates=parse_dates,
2089 dtype=dtype,
2090 )
2091 else:
2092 data = self._fetchall_as_list(cursor)
2093 cursor.close()
2095 frame = _wrap_result(
2096 data,
2097 columns,
2098 index_col=index_col,
2099 coerce_float=coerce_float,
2100 parse_dates=parse_dates,
2101 dtype=dtype,
2102 )
2103 return frame
2105 def _fetchall_as_list(self, cur):
2106 result = cur.fetchall()
2107 if not isinstance(result, list):
2108 result = list(result)
2109 return result
2111 def to_sql(
2112 self,
2113 frame,
2114 name,
2115 if_exists: str = "fail",
2116 index: bool = True,
2117 index_label=None,
2118 schema=None,
2119 chunksize=None,
2120 dtype: DtypeArg | None = None,
2121 method=None,
2122 **kwargs,
2123 ) -> int | None:
2124 """
2125 Write records stored in a DataFrame to a SQL database.
2127 Parameters
2128 ----------
2129 frame: DataFrame
2130 name: string
2131 Name of SQL table.
2132 if_exists: {'fail', 'replace', 'append'}, default 'fail'
2133 fail: If table exists, do nothing.
2134 replace: If table exists, drop it, recreate it, and insert data.
2135 append: If table exists, insert data. Create if it does not exist.
2136 index : bool, default True
2137 Write DataFrame index as a column
2138 index_label : string or sequence, default None
2139 Column label for index column(s). If None is given (default) and
2140 `index` is True, then the index names are used.
2141 A sequence should be given if the DataFrame uses MultiIndex.
2142 schema : string, default None
2143 Ignored parameter included for compatibility with SQLAlchemy
2144 version of ``to_sql``.
2145 chunksize : int, default None
2146 If not None, then rows will be written in batches of this
2147 size at a time. If None, all rows will be written at once.
2148 dtype : single type or dict of column name to SQL type, default None
2149 Optional specifying the datatype for columns. The SQL type should
2150 be a string. If all columns are of the same type, one single value
2151 can be used.
2152 method : {None, 'multi', callable}, default None
2153 Controls the SQL insertion clause used:
2155 * None : Uses standard SQL ``INSERT`` clause (one per row).
2156 * 'multi': Pass multiple values in a single ``INSERT`` clause.
2157 * callable with signature ``(pd_table, conn, keys, data_iter)``.
2159 Details and a sample callable implementation can be found in the
2160 section :ref:`insert method <io.sql.method>`.
2161 """
2162 if dtype:
2163 if not is_dict_like(dtype):
2164 # error: Value expression in dictionary comprehension has incompatible
2165 # type "Union[ExtensionDtype, str, dtype[Any], Type[object],
2166 # Dict[Hashable, Union[ExtensionDtype, Union[str, dtype[Any]],
2167 # Type[str], Type[float], Type[int], Type[complex], Type[bool],
2168 # Type[object]]]]"; expected type "Union[ExtensionDtype, str,
2169 # dtype[Any], Type[object]]"
2170 dtype = {col_name: dtype for col_name in frame} # type: ignore[misc]
2171 else:
2172 dtype = cast(dict, dtype)
2174 for col, my_type in dtype.items():
2175 if not isinstance(my_type, str):
2176 raise ValueError(f"{col} ({my_type}) not a string")
2178 table = SQLiteTable(
2179 name,
2180 self,
2181 frame=frame,
2182 index=index,
2183 if_exists=if_exists,
2184 index_label=index_label,
2185 dtype=dtype,
2186 )
2187 table.create()
2188 return table.insert(chunksize, method)
2190 def has_table(self, name: str, schema: str | None = None) -> bool:
2192 wld = "?"
2193 query = f"SELECT name FROM sqlite_master WHERE type='table' AND name={wld};"
2195 return len(self.execute(query, [name]).fetchall()) > 0
2197 def get_table(self, table_name: str, schema: str | None = None) -> None:
2198 return None # not supported in fallback mode
2200 def drop_table(self, name: str, schema: str | None = None) -> None:
2201 drop_sql = f"DROP TABLE {_get_valid_sqlite_name(name)}"
2202 self.execute(drop_sql)
2204 def _create_sql_schema(
2205 self,
2206 frame,
2207 table_name: str,
2208 keys=None,
2209 dtype: DtypeArg | None = None,
2210 schema: str | None = None,
2211 ):
2212 table = SQLiteTable(
2213 table_name,
2214 self,
2215 frame=frame,
2216 index=False,
2217 keys=keys,
2218 dtype=dtype,
2219 schema=schema,
2220 )
2221 return str(table.sql_schema())
2224def get_schema(
2225 frame,
2226 name: str,
2227 keys=None,
2228 con=None,
2229 dtype: DtypeArg | None = None,
2230 schema: str | None = None,
2231) -> str:
2232 """
2233 Get the SQL db table schema for the given frame.
2235 Parameters
2236 ----------
2237 frame : DataFrame
2238 name : str
2239 name of SQL table
2240 keys : string or sequence, default: None
2241 columns to use a primary key
2242 con: an open SQL database connection object or a SQLAlchemy connectable
2243 Using SQLAlchemy makes it possible to use any DB supported by that
2244 library, default: None
2245 If a DBAPI2 object, only sqlite3 is supported.
2246 dtype : dict of column name to SQL type, default None
2247 Optional specifying the datatype for columns. The SQL type should
2248 be a SQLAlchemy type, or a string for sqlite3 fallback connection.
2249 schema: str, default: None
2250 Optional specifying the schema to be used in creating the table.
2252 .. versionadded:: 1.2.0
2253 """
2254 pandas_sql = pandasSQL_builder(con=con)
2255 return pandas_sql._create_sql_schema(
2256 frame, name, keys=keys, dtype=dtype, schema=schema
2257 )