Skip to content

medspacy.io.pipeline

Pipeline

The Pipeline class executes a batch process of reading texts, processing them with a spaCy model, and writing the results back to a database.

Source code in medspacy/io/pipeline.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Language.factory("medspacy_pipeline")
class Pipeline:
    """The Pipeline class executes a batch process of reading texts, processing them with a spaCy model, and writing
    the results back to a database.
    """

    def __init__(self, nlp, reader, writer, name="medspacy_pipeline", dtype="ent"):
        """Create a new Pipeline object.
        Args:
            reader: A DbReader object
            writer: A Dbwriter object
            nlp: A spaCy model
            dtype: The DocConsumer data type to write to a database.
                Default "ent
                Valid options are ("ent", "section", "context", "doc")
        """

        self.reader = reader
        self.writer = writer
        self.name = name
        self.nlp = nlp
        self.dtype = dtype
        if dtype not in ALLOWED_DATA_TYPES:
            raise ValueError(
                "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                    ALLOWED_DATA_TYPES, dtype
                )
            )

    def process(self):
        """Run a pipeline by reading a set of texts from a source table, processing them with nlp,
        and writing doc._.data back to the destination table.
        """
        query_result = self.reader.read()
        data = None
        while query_result:
            if len(query_result) > 0:
                query_zip = list(zip(*query_result))
                ids = query_zip[0]
                texts = query_zip[1]

                docs = self.nlp.pipe(texts)

                for i, doc in enumerate(docs):
                    text_id = ids[i]
                    # Get the data as rows of tuples
                    doc_data = doc._.get_data(self.dtype, as_rows=True)
                    # Add the identifier column
                    doc_data = [(text_id,) + row_data for row_data in doc_data]
                    # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])
                    # doc_data = pd.DataFrame(data=doc._.get_data(self.dtype))
                    # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])

                    if data is None:
                        data = doc_data.copy()
                    else:
                        data += doc_data.copy()
                    if len(data) >= self.writer.batch_size:
                        self.writer.write_data(data)
                        data = None
            query_result = self.reader.read()

        if data is not None:
            self.writer.write_data(data)
            data = None

        self.reader.close()
        if self.writer.db.conn != self.reader.db.conn:
            self.writer.close()

__init__(nlp, reader, writer, name='medspacy_pipeline', dtype='ent')

Create a new Pipeline object. Args: reader: A DbReader object writer: A Dbwriter object nlp: A spaCy model dtype: The DocConsumer data type to write to a database. Default "ent Valid options are ("ent", "section", "context", "doc")

Source code in medspacy/io/pipeline.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, nlp, reader, writer, name="medspacy_pipeline", dtype="ent"):
    """Create a new Pipeline object.
    Args:
        reader: A DbReader object
        writer: A Dbwriter object
        nlp: A spaCy model
        dtype: The DocConsumer data type to write to a database.
            Default "ent
            Valid options are ("ent", "section", "context", "doc")
    """

    self.reader = reader
    self.writer = writer
    self.name = name
    self.nlp = nlp
    self.dtype = dtype
    if dtype not in ALLOWED_DATA_TYPES:
        raise ValueError(
            "Invalid dtypes. Supported dtypes are {0}, not {1}".format(
                ALLOWED_DATA_TYPES, dtype
            )
        )

process()

Run a pipeline by reading a set of texts from a source table, processing them with nlp, and writing doc._.data back to the destination table.

Source code in medspacy/io/pipeline.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def process(self):
    """Run a pipeline by reading a set of texts from a source table, processing them with nlp,
    and writing doc._.data back to the destination table.
    """
    query_result = self.reader.read()
    data = None
    while query_result:
        if len(query_result) > 0:
            query_zip = list(zip(*query_result))
            ids = query_zip[0]
            texts = query_zip[1]

            docs = self.nlp.pipe(texts)

            for i, doc in enumerate(docs):
                text_id = ids[i]
                # Get the data as rows of tuples
                doc_data = doc._.get_data(self.dtype, as_rows=True)
                # Add the identifier column
                doc_data = [(text_id,) + row_data for row_data in doc_data]
                # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])
                # doc_data = pd.DataFrame(data=doc._.get_data(self.dtype))
                # doc_data.insert(0, self.writer.cols[0], [text_id for _ in range(len(doc_data))])

                if data is None:
                    data = doc_data.copy()
                else:
                    data += doc_data.copy()
                if len(data) >= self.writer.batch_size:
                    self.writer.write_data(data)
                    data = None
        query_result = self.reader.read()

    if data is not None:
        self.writer.write_data(data)
        data = None

    self.reader.close()
    if self.writer.db.conn != self.reader.db.conn:
        self.writer.close()