Skip to content

medspacy.io.db_connect

DbConnect

DbConnect is a wrapper for either a pyodbc or sqlite3 connection. It can then be passed into the DbReader and DbWriter classes to retrieve/store document data.

Source code in medspacy/io/db_connect.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class DbConnect:
    """DbConnect is a wrapper for either a pyodbc or sqlite3 connection. It can then be
    passed into the DbReader and DbWriter classes to retrieve/store document data.
    """

    def __init__(
        self, driver=None, server=None, db=None, user=None, pwd=None, conn=None
    ):
        """Create a new DbConnect object. You can pass in either information for a pyodbc connection string
        or directly pass in a sqlite or pyodbc connection object.

        If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

        Args:
            driver
            server
            db:
            user
            pwd
            conn
        """
        if conn is None:
            if not all([driver, server, db, user, pwd]):
                raise ValueError(
                    "If you are not passing in a connection object, "
                    "you must pass in all other arguments to create a DB connection."
                )
            import pyodbc

            self.conn = pyodbc.connect(
                "DRIVER={0};SERVER={1};DATABASE={2};USER={3};PWD={4}".format(
                    driver, server, db, user, pwd
                )
            )
        else:
            self.conn = conn
        self.cursor = self.conn.cursor()
        # according this thread, bulk insert for sqlserver need to set fast_executemany=True.
        # https://stackoverflow.com/questions/29638136/how-to-speed-up-bulk-insert-to-ms-sql-server-using-pyodbc
        if hasattr(self.cursor, 'fast_executemany'):
            self.cursor.fast_executemany = True

        import sqlite3

        if isinstance(self.conn, sqlite3.Connection):
            self.db_lib = "sqlite3"
            self.database_exception = sqlite3.DatabaseError
        else:
            import pyodbc
            if isinstance(self.conn, pyodbc.Connection):
                self.db_lib = "pyodbc"
                self.database_exception = pyodbc.DatabaseError
            else:
                raise ValueError(
                    "conn must be either a sqlite3 or pyodbc Connection object, not {0}".format(
                        type(self.conn)
                    )
                )

        print("Opened connection to {0}.{1}".format(server, db))

    def create_table(self, query, table_name, drop_existing):
        if drop_existing:
            try:
                self.cursor.execute("drop table if exists {0}".format(table_name))
            # except pyodbc.DatabaseError:
            except self.database_exception as e:
                pass
            else:
                self.conn.commit()
        try:
            self.cursor.execute(query)
        except self.database_exception as e:
            self.conn.rollback()
            self.conn.close()
            raise e
        else:
            self.conn.commit()
            print("Created table {0} with query: {1}".format(table_name, query))

    def write(self, query, data):
        try:
            self.cursor.executemany(query, data)
        except self.database_exception as e:
            self.conn.rollback()
            self.conn.close()
            raise e
        else:
            self.conn.commit()
            # print("Wrote {0} rows with query: {1}".format(len(data), query))

    def read(self, query):
        self.cursor.execute(query)
        result = self.cursor.fetchall()
        # print("Read {0} rows with query: {1}".format(len(result), query))
        return result

    def close(self):
        self.conn.commit()
        self.conn.close()
        print("Connection closed.")

__init__(driver=None, server=None, db=None, user=None, pwd=None, conn=None)

Create a new DbConnect object. You can pass in either information for a pyodbc connection string or directly pass in a sqlite or pyodbc connection object.

If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

Parameters:

Name Type Description Default
db
None
Source code in medspacy/io/db_connect.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self, driver=None, server=None, db=None, user=None, pwd=None, conn=None
):
    """Create a new DbConnect object. You can pass in either information for a pyodbc connection string
    or directly pass in a sqlite or pyodbc connection object.

    If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

    Args:
        driver
        server
        db:
        user
        pwd
        conn
    """
    if conn is None:
        if not all([driver, server, db, user, pwd]):
            raise ValueError(
                "If you are not passing in a connection object, "
                "you must pass in all other arguments to create a DB connection."
            )
        import pyodbc

        self.conn = pyodbc.connect(
            "DRIVER={0};SERVER={1};DATABASE={2};USER={3};PWD={4}".format(
                driver, server, db, user, pwd
            )
        )
    else:
        self.conn = conn
    self.cursor = self.conn.cursor()
    # according this thread, bulk insert for sqlserver need to set fast_executemany=True.
    # https://stackoverflow.com/questions/29638136/how-to-speed-up-bulk-insert-to-ms-sql-server-using-pyodbc
    if hasattr(self.cursor, 'fast_executemany'):
        self.cursor.fast_executemany = True

    import sqlite3

    if isinstance(self.conn, sqlite3.Connection):
        self.db_lib = "sqlite3"
        self.database_exception = sqlite3.DatabaseError
    else:
        import pyodbc
        if isinstance(self.conn, pyodbc.Connection):
            self.db_lib = "pyodbc"
            self.database_exception = pyodbc.DatabaseError
        else:
            raise ValueError(
                "conn must be either a sqlite3 or pyodbc Connection object, not {0}".format(
                    type(self.conn)
                )
            )

    print("Opened connection to {0}.{1}".format(server, db))