Skip to content

medspacy.io

DbConnect

DbConnect is a wrapper for either a pyodbc or sqlite3 connection. It can then be passed into the DbReader and DbWriter classes to retrieve/store document data.

Source code in medspacy/io/db_connect.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
class DbConnect:
    """DbConnect is a wrapper for either a pyodbc or sqlite3 connection. It can then be
    passed into the DbReader and DbWriter classes to retrieve/store document data.
    """

    def __init__(
        self, driver=None, server=None, db=None, user=None, pwd=None, conn=None
    ):
        """Create a new DbConnect object. You can pass in either information for a pyodbc connection string
        or directly pass in a sqlite or pyodbc connection object.

        If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

        Args:
            driver
            server
            db:
            user
            pwd
            conn
        """
        if conn is None:
            if not all([driver, server, db, user, pwd]):
                raise ValueError(
                    "If you are not passing in a connection object, "
                    "you must pass in all other arguments to create a DB connection."
                )
            import pyodbc

            self.conn = pyodbc.connect(
                "DRIVER={0};SERVER={1};DATABASE={2};USER={3};PWD={4}".format(
                    driver, server, db, user, pwd
                )
            )
        else:
            self.conn = conn
        self.cursor = self.conn.cursor()
        # according this thread, bulk insert for sqlserver need to set fast_executemany=True.
        # https://stackoverflow.com/questions/29638136/how-to-speed-up-bulk-insert-to-ms-sql-server-using-pyodbc
        if hasattr(self.cursor, 'fast_executemany'):
            self.cursor.fast_executemany = True

        import sqlite3

        if isinstance(self.conn, sqlite3.Connection):
            self.db_lib = "sqlite3"
            self.database_exception = sqlite3.DatabaseError
        else:
            import pyodbc
            if isinstance(self.conn, pyodbc.Connection):
                self.db_lib = "pyodbc"
                self.database_exception = pyodbc.DatabaseError
            else:
                raise ValueError(
                    "conn must be either a sqlite3 or pyodbc Connection object, not {0}".format(
                        type(self.conn)
                    )
                )

        print("Opened connection to {0}.{1}".format(server, db))

    def create_table(self, query, table_name, drop_existing):
        if drop_existing:
            try:
                self.cursor.execute("drop table if exists {0}".format(table_name))
            # except pyodbc.DatabaseError:
            except self.database_exception as e:
                pass
            else:
                self.conn.commit()
        try:
            self.cursor.execute(query)
        except self.database_exception as e:
            self.conn.rollback()
            self.conn.close()
            raise e
        else:
            self.conn.commit()
            print("Created table {0} with query: {1}".format(table_name, query))

    def write(self, query, data):
        try:
            self.cursor.executemany(query, data)
        except self.database_exception as e:
            self.conn.rollback()
            self.conn.close()
            raise e
        else:
            self.conn.commit()
            # print("Wrote {0} rows with query: {1}".format(len(data), query))

    def read(self, query):
        self.cursor.execute(query)
        result = self.cursor.fetchall()
        # print("Read {0} rows with query: {1}".format(len(result), query))
        return result

    def close(self):
        self.conn.commit()
        self.conn.close()
        print("Connection closed.")

__init__(driver=None, server=None, db=None, user=None, pwd=None, conn=None)

Create a new DbConnect object. You can pass in either information for a pyodbc connection string or directly pass in a sqlite or pyodbc connection object.

If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

Parameters:

Name Type Description Default
db
None
Source code in medspacy/io/db_connect.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def __init__(
    self, driver=None, server=None, db=None, user=None, pwd=None, conn=None
):
    """Create a new DbConnect object. You can pass in either information for a pyodbc connection string
    or directly pass in a sqlite or pyodbc connection object.

    If conn is None, all other arguments must be supplied. If conn is passed in, all other arguments will be ignored.

    Args:
        driver
        server
        db:
        user
        pwd
        conn
    """
    if conn is None:
        if not all([driver, server, db, user, pwd]):
            raise ValueError(
                "If you are not passing in a connection object, "
                "you must pass in all other arguments to create a DB connection."
            )
        import pyodbc

        self.conn = pyodbc.connect(
            "DRIVER={0};SERVER={1};DATABASE={2};USER={3};PWD={4}".format(
                driver, server, db, user, pwd
            )
        )
    else:
        self.conn = conn
    self.cursor = self.conn.cursor()
    # according this thread, bulk insert for sqlserver need to set fast_executemany=True.
    # https://stackoverflow.com/questions/29638136/how-to-speed-up-bulk-insert-to-ms-sql-server-using-pyodbc
    if hasattr(self.cursor, 'fast_executemany'):
        self.cursor.fast_executemany = True

    import sqlite3

    if isinstance(self.conn, sqlite3.Connection):
        self.db_lib = "sqlite3"
        self.database_exception = sqlite3.DatabaseError
    else:
        import pyodbc
        if isinstance(self.conn, pyodbc.Connection):
            self.db_lib = "pyodbc"
            self.database_exception = pyodbc.DatabaseError
        else:
            raise ValueError(
                "conn must be either a sqlite3 or pyodbc Connection object, not {0}".format(
                    type(self.conn)
                )
            )

    print("Opened connection to {0}.{1}".format(server, db))

DbWriter

DbWriter is a utility class for writing structured data back to a database.

Source code in medspacy/io/db_writer.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
class DbWriter:
    """DbWriter is a utility class for writing structured data back to a database."""

    def __init__(
            self,
            db_conn,
            destination_table,
            cols=None,
            col_types=None,
            doc_dtype="ents",
            create_table=False,
            drop_existing=False,
            write_batch_size=100,
    ):
        """Create a new DbWriter object.

        Args:
            db_conn: A medspacy.io.DbConnect object
            destination_table: The name of the table to write to
            cols (opt): The names of the columns of the destination table. These should align with attributes extracted
                by DocConsumer and stored in doc._.data. A set of default values can be accessed by:
                >>> DbWriter.get_default_cols()
            col_types (opt): The sql data types of the table columns. They should correspond 1:1 with cols.
                A set of default values can be accesed by:
                >>> DbWriter.get_default_col_types()
            doc_dtype: The type of data from DocConsumer to write from a doc.
                Either ("ents", "section", "context", or "doc")
            create_table (bool): Whether to create a table

        """
        self.db = db_conn
        self.destination_table = destination_table
        self._create_table = create_table
        self.drop_existing = drop_existing
        if cols is None and col_types is None:
            cols = DEFAULT_COLS[doc_dtype]
            col_types = [DEFAULT_COL_TYPES[doc_dtype][col] for col in cols]
        elif cols is None and col_types is not None:
            raise ValueError("cols must be specified if col_types is not None.")
        self.cols = cols
        self.col_types = col_types
        _validate_dtypes((doc_dtype,))
        self.doc_dtype = doc_dtype
        self.batch_size = write_batch_size

        self.insert_query = ""
        if create_table:
            self.create_table()
        self.make_insert_query()

    @classmethod
    def get_default_col_types(cls, dtypes=None):

        if dtypes is None:
            dtypes = tuple(DEFAULT_COL_TYPES.keys())
        else:
            if isinstance(dtypes, str):
                dtypes = (dtypes,)

        _validate_dtypes(dtypes)
        dtype_col_types = {
            dtype: col_types
            for (dtype, col_types) in DEFAULT_COL_TYPES.items()
            if dtype in dtypes
        }
        return dtype_col_types

    @classmethod
    def get_default_cols(cls, dtypes=None):
        if dtypes is None:
            dtypes = tuple(DEFAULT_COL_TYPES.keys())
        else:
            if isinstance(dtypes, str):
                dtypes = (dtypes,)
        _validate_dtypes(dtypes)

        dtype_cols = {
            dtype: cols
            for (dtype, cols) in DEFAULT_COL_TYPES.items()
            if dtype in dtypes
        }
        return dtype_cols

    def create_table(self):
        query = "CREATE TABLE {0} (".format(self.destination_table)
        for i, col in enumerate(self.cols):
            query += "{0} {1}".format(col, self.col_types[i])
            if i < len(self.cols) - 1:
                query += ", "
            else:
                query += ")"
        self.db.create_table(query, self.destination_table, self.drop_existing)

    def make_insert_query(self):
        col_list = ", ".join([col for col in self.cols])
        q_list = ", ".join(["?" for col in self.cols])
        self.insert_query = "INSERT INTO {0} ({1}) VALUES ({2})".format(
            self.destination_table, col_list, q_list
        )

    def write(self, docs: Union[Doc, List[Doc]]):
        """Write a list of docs or doc to a database."""
        if isinstance(docs, Doc):
            self.write_doc(docs)
        else:
            self.write_docs(docs)

    def write_doc(self, doc):
        """Write a doc to a database."""
        data = doc._.get_data(self.doc_dtype, attrs=self.cols, as_rows=True)
        self.write_data(data)

    def write_docs(self, docs, batch_size=800):
        """write a list of docs to database through bulk insert"""
        data = []
        for doc in docs:
            data.extend(doc._.get_data(self.doc_dtype, attrs=self.cols, as_rows=True))
            if len(data) >= batch_size:
                self.write_data(data)
                data = []
        if len(data) > 0:
            self.write_data(data)
        pass

    def write_data(self, data):
        self.db.write(self.insert_query, data)

    def close(self):
        self.db.close()

__init__(db_conn, destination_table, cols=None, col_types=None, doc_dtype='ents', create_table=False, drop_existing=False, write_batch_size=100)

Create a new DbWriter object.

Parameters:

Name Type Description Default
db_conn

A medspacy.io.DbConnect object

required
destination_table

The name of the table to write to

required
cols opt

The names of the columns of the destination table. These should align with attributes extracted by DocConsumer and stored in doc._.data. A set of default values can be accessed by:

DbWriter.get_default_cols()

None
col_types opt

The sql data types of the table columns. They should correspond 1:1 with cols. A set of default values can be accesed by:

DbWriter.get_default_col_types()

None
doc_dtype

The type of data from DocConsumer to write from a doc. Either ("ents", "section", "context", or "doc")

'ents'
create_table bool

Whether to create a table

False
Source code in medspacy/io/db_writer.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def __init__(
        self,
        db_conn,
        destination_table,
        cols=None,
        col_types=None,
        doc_dtype="ents",
        create_table=False,
        drop_existing=False,
        write_batch_size=100,
):
    """Create a new DbWriter object.

    Args:
        db_conn: A medspacy.io.DbConnect object
        destination_table: The name of the table to write to
        cols (opt): The names of the columns of the destination table. These should align with attributes extracted
            by DocConsumer and stored in doc._.data. A set of default values can be accessed by:
            >>> DbWriter.get_default_cols()
        col_types (opt): The sql data types of the table columns. They should correspond 1:1 with cols.
            A set of default values can be accesed by:
            >>> DbWriter.get_default_col_types()
        doc_dtype: The type of data from DocConsumer to write from a doc.
            Either ("ents", "section", "context", or "doc")
        create_table (bool): Whether to create a table

    """
    self.db = db_conn
    self.destination_table = destination_table
    self._create_table = create_table
    self.drop_existing = drop_existing
    if cols is None and col_types is None:
        cols = DEFAULT_COLS[doc_dtype]
        col_types = [DEFAULT_COL_TYPES[doc_dtype][col] for col in cols]
    elif cols is None and col_types is not None:
        raise ValueError("cols must be specified if col_types is not None.")
    self.cols = cols
    self.col_types = col_types
    _validate_dtypes((doc_dtype,))
    self.doc_dtype = doc_dtype
    self.batch_size = write_batch_size

    self.insert_query = ""
    if create_table:
        self.create_table()
    self.make_insert_query()

write(docs)

Write a list of docs or doc to a database.

Source code in medspacy/io/db_writer.py
160
161
162
163
164
165
def write(self, docs: Union[Doc, List[Doc]]):
    """Write a list of docs or doc to a database."""
    if isinstance(docs, Doc):
        self.write_doc(docs)
    else:
        self.write_docs(docs)

write_doc(doc)

Write a doc to a database.

Source code in medspacy/io/db_writer.py
167
168
169
170
def write_doc(self, doc):
    """Write a doc to a database."""
    data = doc._.get_data(self.doc_dtype, attrs=self.cols, as_rows=True)
    self.write_data(data)

write_docs(docs, batch_size=800)

write a list of docs to database through bulk insert

Source code in medspacy/io/db_writer.py
172
173
174
175
176
177
178
179
180
181
182
def write_docs(self, docs, batch_size=800):
    """write a list of docs to database through bulk insert"""
    data = []
    for doc in docs:
        data.extend(doc._.get_data(self.doc_dtype, attrs=self.cols, as_rows=True))
        if len(data) >= batch_size:
            self.write_data(data)
            data = []
    if len(data) > 0:
        self.write_data(data)
    pass

DocConsumer

A DocConsumer object will consume a spacy doc and output rows based on a configuration provided by the user.

This component extracts structured information from a Doc. Information is stored in doc._.data, which is a nested dictionary. The outer keys represent the data type of can one or more of: - "ents": data about the spans in doc.ents such as the text, label, context attributes, section information, or custom attributes - "group": data about spans in a span group with the name span_group_attrs section text and category - "context": data about entity-modifier pairs extracted by ConText - "doc": a single doc-level representation. By default only doc.text is extracted, but other attributes may be specified

Once processed, a doc's data can be accessed either by:
    - doc._.data
    - doc._.get_data(dtype=...)
    - doc._.ent_data
    - doc._.to_dataframe(dtype=...)
Source code in medspacy/io/doc_consumer.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@Language.factory("medspacy_doc_consumer")
class DocConsumer:
    """
    A DocConsumer object will consume a spacy doc and output rows based on a configuration provided by the user.

    This component extracts structured information from a Doc. Information is stored in doc._.data, which is a
        nested dictionary. The outer keys represent the data type of can one or more of:
            - "ents": data about the spans in doc.ents such as the text, label,
                context attributes, section information, or custom attributes
            - "group": data about spans in a span group with the name `span_group_attrs` section text and category
            - "context": data about entity-modifier pairs extracted by ConText
            - "doc": a single doc-level representation. By default only doc.text is extracted, but other attributes may
                be specified

        Once processed, a doc's data can be accessed either by:
            - doc._.data
            - doc._.get_data(dtype=...)
            - doc._.ent_data
            - doc._.to_dataframe(dtype=...)
    """

    def __init__(
        self,
        nlp,
        name: str = "medspacy_doc_consumer",
        dtypes: Tuple = ("ents",),
        dtype_attrs: Dict = None,
        span_group_name: str = "medspacy_spans",
    ):
        """
        Creates a new DocConsumer.

        Args:
            nlp: A spaCy model
            dtypes: Either a tuple of data types to collect or the string "all". Default ("ents",). Valid  options are:
                "ents", "group", "section", "context", "doc".
            dtype_attrs: An optional dictionary mapping the data types in dtypes to a list of attributes. If None, will
                set defaults for each dtype. Attributes for "ents", "group", and "doc" may be customized be adding either
                native or custom attributes (i.e., ent._....) "context" and "section" are not customizable at this time.
                Default values for each dtype can be retrieved by the class method `DocConsumer.get_default_attrs()
            span_group_name: the name of the span group used when dtypes contains "group". At this time, only one span
                group is supported.
        """
        self.nlp = nlp
        self.name = name
        self._span_group_name = span_group_name
        if not isinstance(dtypes, tuple):
            if dtypes == "all":
                dtypes = tuple(ALLOWED_DATA_TYPES)
            else:
                raise ValueError(
                    "dtypes must be either 'all' or a tuple, not {0}".format(dtypes)
                )
        for dtype in dtypes:
            if dtype not in ALLOWED_DATA_TYPES:
                raise ValueError(
                    "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                        ALLOWED_DATA_TYPES, dtype
                    )
                )
            if dtype == "section":
                self.validate_section_attrs(dtype_attrs)
        self.dtypes = dtypes
        self.dtype_attrs = dtype_attrs

        if self.dtype_attrs is None:
            self._set_default_attrs()

    @classmethod
    def get_default_attrs(cls, dtypes: Optional[Tuple] = None):
        """
        Gets the default attributes available to each type specified.

        Args:
            dtypes: Optional tuple containing "ents", "group", "context", "section", or "doc". If None, all will be
                returned.

        Returns:
            The attributes the doc consumer will output for each of the specified types in `dtypes`.
        """
        if dtypes is None:
            dtypes = ALLOWED_DATA_TYPES
        else:
            if isinstance(dtypes, str):
                dtypes = (dtypes,)
            for dtype in dtypes:
                if dtype not in ALLOWED_DATA_TYPES:
                    raise ValueError("Invalid dtype,", dtype)
        dtype_attrs = {
            dtype: list(attrs)
            for (dtype, attrs) in DEFAULT_ATTRS.items()
            if dtype in dtypes
        }
        return dtype_attrs

    def _set_default_attrs(self):
        """
        Gets the default attributes.
        """
        self.dtype_attrs = self.get_default_attrs(self.dtypes)

    def validate_section_attrs(self, attrs):
        """
        Validate that section attributes are either not specified or are valid attribute names.
        """
        if attrs is None:
            return True
        if "section" not in attrs:
            return True
        diff = set(attrs["section"]).difference(ALLOWED_SECTION_ATTRS)
        if diff:
            raise ValueError("Invalid section dtype_attrs specified: {0}".format(diff))
        return True

    def __call__(self, doc):
        """
        Call the doc consumer on a doc and assign the data.

        Args:
            doc: The Doc to process.

        Returns:
            The processed Doc.
        """
        data = dict()
        for dtype, attrs in self.dtype_attrs.items():
            data.setdefault(dtype, OrderedDict())
            for attr in attrs:
                data[dtype][attr] = list()
        if "ents" in self.dtypes:
            for ent in doc.ents:
                for attr in self.dtype_attrs["ents"]:
                    try:
                        val = getattr(ent, attr)
                    except AttributeError:
                        val = getattr(ent._, attr)
                    data["ents"][attr].append(val)
        if "group" in self.dtypes:
            for span in doc.spans[self._span_group_name]:
                for attr in self.dtype_attrs["group"]:
                    try:
                        val = getattr(span, attr)
                    except AttributeError:
                        val = getattr(span._, attr)
                    data["group"][attr].append(val)
        if "context" in self.dtypes:
            for (ent, modifier) in doc._.context_graph.edges:
                self.add_context_edge_attributes(ent, modifier, data["context"], doc)
        if "section" in self.dtypes:
            for section in doc._.sections:
                self.add_section_attributes(section, data["section"], doc)
        if "doc" in self.dtypes:
            for attr in self.dtype_attrs["doc"]:
                try:
                    val = getattr(doc, attr)
                except AttributeError:
                    val = getattr(doc._, attr)
                data["doc"][attr].append(val)

        doc._.data = data
        return doc

    def add_context_edge_attributes(
        self, ent: Span, modifier: ConTextModifier, context_data, doc
    ):
        span_tup = modifier.modifier_span
        span = doc[span_tup[0] : span_tup[1]]
        scope_tup = modifier.scope_span
        scope = doc[scope_tup[0] : scope_tup[1]]
        for attr in self.dtype_attrs["context"]:
            if attr == "ent_text":
                context_data["ent_text"].append(ent.text)
            elif attr == "ent_label_":
                context_data["ent_label_"].append(ent.label_)
            elif attr == "ent_start_char":
                context_data["ent_start_char"].append(ent.start_char)
            elif attr == "ent_end_char":
                context_data["ent_end_char"].append(ent.end_char)
            elif attr == "modifier_text":
                context_data["modifier_text"].append(span.text)
            elif attr == "modifier_category":
                context_data["modifier_category"].append(modifier.category)
            elif attr == "modifier_direction":
               context_data["modifier_direction"].append(modifier.direction)
            elif attr == "modifier_start_char":
                context_data["modifier_start_char"].append(span.start_char)
            elif attr == "modifier_end_char":
                context_data["modifier_end_char"].append(span.end_char)
            elif attr == "modifier_scope_start_char":
                context_data["modifier_scope_start_char"].append(scope.start_char)
            elif attr == "modifier_scope_end_char":
                context_data["modifier_scope_end_char"].append(scope.end_char)
            else:
            # if specified attribute is not one of these standard values, check the entity to see if it's an entity value
                try:
                    val = getattr(ent, attr)
                except AttributeError:
                    try:
                        val = getattr(ent._, attr)
                    except AttributeError:
                        raise ValueError(f"Attributes for dtype 'context' must be either "
                                         f"a registered custom Span attribute (i.e., Span._.attr) or one of these pre-defined values: "
                                          f"{ALLOWED_CONTEXT_ATTRS}. \nYou passed in '{attr}'")
                context_data[f"{attr}"].append(val)

    def add_section_attributes(self, section, section_data, doc):
        # Allow for null sections
        section_title_tup = section.title_span
        section_body_tup = section.body_span
        section_title = doc[section_title_tup[0] : section_title_tup[1]]
        section_body = doc[section_body_tup[0] : section_body_tup[1]]
        if "section_category" in self.dtype_attrs["section"]:
            section_data["section_category"].append(section.category)
        if section.category is not None:
            if "section_title_text" in self.dtype_attrs["section"]:
                section_data["section_title_text"].append(section_title.text)
            if "section_title_start_char" in self.dtype_attrs["section"]:
                section_data["section_title_start_char"].append(
                    section_title.start_char
                )
            if "section_title_end_char" in self.dtype_attrs["section"]:
                section_data["section_title_end_char"].append(section_title.end_char)
        else:
            if "section_title_text" in self.dtype_attrs["section"]:
                section_data["section_title_text"].append(None)
            if "section_title_start_char" in self.dtype_attrs["section"]:
                section_data["section_title_start_char"].append(0)
            if "section_title_end_char" in self.dtype_attrs["section"]:
                section_data["section_title_end_char"].append(0)
        if "section_body" in self.dtype_attrs["section"]:
            section_data["section_body"].append(section_body.text)
        if "section_body_start_char" in self.dtype_attrs["section"]:
            section_data["section_body_start_char"].append(section_body.start_char)
        if "section_body_end_char" in self.dtype_attrs["section"]:
            section_data["section_body_end_char"].append(section_body.end_char)
        if "section_parent" in self.dtype_attrs["section"]:
            section_data["section_parent"].append(section.parent)

__call__(doc)

Call the doc consumer on a doc and assign the data.

Parameters:

Name Type Description Default
doc

The Doc to process.

required

Returns:

Type Description

The processed Doc.

Source code in medspacy/io/doc_consumer.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def __call__(self, doc):
    """
    Call the doc consumer on a doc and assign the data.

    Args:
        doc: The Doc to process.

    Returns:
        The processed Doc.
    """
    data = dict()
    for dtype, attrs in self.dtype_attrs.items():
        data.setdefault(dtype, OrderedDict())
        for attr in attrs:
            data[dtype][attr] = list()
    if "ents" in self.dtypes:
        for ent in doc.ents:
            for attr in self.dtype_attrs["ents"]:
                try:
                    val = getattr(ent, attr)
                except AttributeError:
                    val = getattr(ent._, attr)
                data["ents"][attr].append(val)
    if "group" in self.dtypes:
        for span in doc.spans[self._span_group_name]:
            for attr in self.dtype_attrs["group"]:
                try:
                    val = getattr(span, attr)
                except AttributeError:
                    val = getattr(span._, attr)
                data["group"][attr].append(val)
    if "context" in self.dtypes:
        for (ent, modifier) in doc._.context_graph.edges:
            self.add_context_edge_attributes(ent, modifier, data["context"], doc)
    if "section" in self.dtypes:
        for section in doc._.sections:
            self.add_section_attributes(section, data["section"], doc)
    if "doc" in self.dtypes:
        for attr in self.dtype_attrs["doc"]:
            try:
                val = getattr(doc, attr)
            except AttributeError:
                val = getattr(doc._, attr)
            data["doc"][attr].append(val)

    doc._.data = data
    return doc

__init__(nlp, name='medspacy_doc_consumer', dtypes=('ents',), dtype_attrs=None, span_group_name='medspacy_spans')

Creates a new DocConsumer.

Parameters:

Name Type Description Default
nlp

A spaCy model

required
dtypes Tuple

Either a tuple of data types to collect or the string "all". Default ("ents",). Valid options are: "ents", "group", "section", "context", "doc".

('ents',)
dtype_attrs Dict

An optional dictionary mapping the data types in dtypes to a list of attributes. If None, will set defaults for each dtype. Attributes for "ents", "group", and "doc" may be customized be adding either native or custom attributes (i.e., ent._....) "context" and "section" are not customizable at this time. Default values for each dtype can be retrieved by the class method `DocConsumer.get_default_attrs()

None
span_group_name str

the name of the span group used when dtypes contains "group". At this time, only one span group is supported.

'medspacy_spans'
Source code in medspacy/io/doc_consumer.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def __init__(
    self,
    nlp,
    name: str = "medspacy_doc_consumer",
    dtypes: Tuple = ("ents",),
    dtype_attrs: Dict = None,
    span_group_name: str = "medspacy_spans",
):
    """
    Creates a new DocConsumer.

    Args:
        nlp: A spaCy model
        dtypes: Either a tuple of data types to collect or the string "all". Default ("ents",). Valid  options are:
            "ents", "group", "section", "context", "doc".
        dtype_attrs: An optional dictionary mapping the data types in dtypes to a list of attributes. If None, will
            set defaults for each dtype. Attributes for "ents", "group", and "doc" may be customized be adding either
            native or custom attributes (i.e., ent._....) "context" and "section" are not customizable at this time.
            Default values for each dtype can be retrieved by the class method `DocConsumer.get_default_attrs()
        span_group_name: the name of the span group used when dtypes contains "group". At this time, only one span
            group is supported.
    """
    self.nlp = nlp
    self.name = name
    self._span_group_name = span_group_name
    if not isinstance(dtypes, tuple):
        if dtypes == "all":
            dtypes = tuple(ALLOWED_DATA_TYPES)
        else:
            raise ValueError(
                "dtypes must be either 'all' or a tuple, not {0}".format(dtypes)
            )
    for dtype in dtypes:
        if dtype not in ALLOWED_DATA_TYPES:
            raise ValueError(
                "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                    ALLOWED_DATA_TYPES, dtype
                )
            )
        if dtype == "section":
            self.validate_section_attrs(dtype_attrs)
    self.dtypes = dtypes
    self.dtype_attrs = dtype_attrs

    if self.dtype_attrs is None:
        self._set_default_attrs()

_set_default_attrs()

Gets the default attributes.

Source code in medspacy/io/doc_consumer.py
156
157
158
159
160
def _set_default_attrs(self):
    """
    Gets the default attributes.
    """
    self.dtype_attrs = self.get_default_attrs(self.dtypes)

get_default_attrs(dtypes=None) classmethod

Gets the default attributes available to each type specified.

Parameters:

Name Type Description Default
dtypes Optional[Tuple]

Optional tuple containing "ents", "group", "context", "section", or "doc". If None, all will be returned.

None

Returns:

Type Description

The attributes the doc consumer will output for each of the specified types in dtypes.

Source code in medspacy/io/doc_consumer.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
@classmethod
def get_default_attrs(cls, dtypes: Optional[Tuple] = None):
    """
    Gets the default attributes available to each type specified.

    Args:
        dtypes: Optional tuple containing "ents", "group", "context", "section", or "doc". If None, all will be
            returned.

    Returns:
        The attributes the doc consumer will output for each of the specified types in `dtypes`.
    """
    if dtypes is None:
        dtypes = ALLOWED_DATA_TYPES
    else:
        if isinstance(dtypes, str):
            dtypes = (dtypes,)
        for dtype in dtypes:
            if dtype not in ALLOWED_DATA_TYPES:
                raise ValueError("Invalid dtype,", dtype)
    dtype_attrs = {
        dtype: list(attrs)
        for (dtype, attrs) in DEFAULT_ATTRS.items()
        if dtype in dtypes
    }
    return dtype_attrs

validate_section_attrs(attrs)

Validate that section attributes are either not specified or are valid attribute names.

Source code in medspacy/io/doc_consumer.py
162
163
164
165
166
167
168
169
170
171
172
173
def validate_section_attrs(self, attrs):
    """
    Validate that section attributes are either not specified or are valid attribute names.
    """
    if attrs is None:
        return True
    if "section" not in attrs:
        return True
    diff = set(attrs["section"]).difference(ALLOWED_SECTION_ATTRS)
    if diff:
        raise ValueError("Invalid section dtype_attrs specified: {0}".format(diff))
    return True

Pipeline

The Pipeline class executes a batch process of reading texts, processing them with a spaCy model, and writing the results back to a database.

Source code in medspacy/io/pipeline.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Language.factory("medspacy_pipeline")
class Pipeline:
    """The Pipeline class executes a batch process of reading texts, processing them with a spaCy model, and writing
    the results back to a database.
    """

    def __init__(self, nlp, reader, writer, name="medspacy_pipeline", dtype="ent"):
        """Create a new Pipeline object.
        Args:
            reader: A DbReader object
            writer: A Dbwriter object
            nlp: A spaCy model
            dtype: The DocConsumer data type to write to a database.
                Default "ent
                Valid options are ("ent", "section", "context", "doc")
        """

        self.reader = reader
        self.writer = writer
        self.name = name
        self.nlp = nlp
        self.dtype = dtype
        if dtype not in ALLOWED_DATA_TYPES:
            raise ValueError(
                "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                    ALLOWED_DATA_TYPES, dtype
                )
            )

    def process(self):
        """Run a pipeline by reading a set of texts from a source table, processing them with nlp,
        and writing doc._.data back to the destination table.
        """
        query_result = self.reader.read()
        data = None
        while query_result:
            if len(query_result) > 0:
                query_zip = list(zip(*query_result))
                ids = query_zip[0]
                texts = query_zip[1]

                docs = self.nlp.pipe(texts)

                for i, doc in enumerate(docs):
                    text_id = ids[i]
                    # Get the data as rows of tuples
                    doc_data = doc._.get_data(self.dtype, as_rows=True)
                    # Add the identifier column
                    doc_data = [(text_id,) + row_data for row_data in doc_data]
                    # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])
                    # doc_data = pd.DataFrame(data=doc._.get_data(self.dtype))
                    # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])

                    if data is None:
                        data = doc_data.copy()
                    else:
                        data += doc_data.copy()
                    if len(data) >= self.writer.batch_size:
                        self.writer.write_data(data)
                        data = None
            query_result = self.reader.read()

        if data is not None:
            self.writer.write_data(data)
            data = None

        self.reader.close()
        if self.writer.db.conn != self.reader.db.conn:
            self.writer.close()

__init__(nlp, reader, writer, name='medspacy_pipeline', dtype='ent')

Create a new Pipeline object. Args: reader: A DbReader object writer: A Dbwriter object nlp: A spaCy model dtype: The DocConsumer data type to write to a database. Default "ent Valid options are ("ent", "section", "context", "doc")

Source code in medspacy/io/pipeline.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, nlp, reader, writer, name="medspacy_pipeline", dtype="ent"):
    """Create a new Pipeline object.
    Args:
        reader: A DbReader object
        writer: A Dbwriter object
        nlp: A spaCy model
        dtype: The DocConsumer data type to write to a database.
            Default "ent
            Valid options are ("ent", "section", "context", "doc")
    """

    self.reader = reader
    self.writer = writer
    self.name = name
    self.nlp = nlp
    self.dtype = dtype
    if dtype not in ALLOWED_DATA_TYPES:
        raise ValueError(
            "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                ALLOWED_DATA_TYPES, dtype
            )
        )

process()

Run a pipeline by reading a set of texts from a source table, processing them with nlp, and writing doc._.data back to the destination table.

Source code in medspacy/io/pipeline.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def process(self):
    """Run a pipeline by reading a set of texts from a source table, processing them with nlp,
    and writing doc._.data back to the destination table.
    """
    query_result = self.reader.read()
    data = None
    while query_result:
        if len(query_result) > 0:
            query_zip = list(zip(*query_result))
            ids = query_zip[0]
            texts = query_zip[1]

            docs = self.nlp.pipe(texts)

            for i, doc in enumerate(docs):
                text_id = ids[i]
                # Get the data as rows of tuples
                doc_data = doc._.get_data(self.dtype, as_rows=True)
                # Add the identifier column
                doc_data = [(text_id,) + row_data for row_data in doc_data]
                # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])
                # doc_data = pd.DataFrame(data=doc._.get_data(self.dtype))
                # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])

                if data is None:
                    data = doc_data.copy()
                else:
                    data += doc_data.copy()
                if len(data) >= self.writer.batch_size:
                    self.writer.write_data(data)
                    data = None
        query_result = self.reader.read()

    if data is not None:
        self.writer.write_data(data)
        data = None

    self.reader.close()
    if self.writer.db.conn != self.reader.db.conn:
        self.writer.close()