Source code for udapi.core.basereader

"""BaseReader is the base class for all reader blocks."""
import gc
import re
import logging
from udapi.core.block import Block
from udapi.core.files import Files

# pylint: disable=too-many-instance-attributes


[docs] class BaseReader(Block): """Base class for all reader blocks.""" # pylint: disable=too-many-arguments def __init__(self, files='-', filehandle=None, zone='keep', bundles_per_doc=0, encoding='utf-8-sig', sent_id_filter=None, split_docs=False, ignore_sent_id=False, merge=False, max_docs=0, **kwargs): super().__init__(**kwargs) if filehandle is not None: files = None self.files = Files(filenames=files, filehandle=filehandle, encoding=encoding) self.zone = zone self.bundles_per_doc = bundles_per_doc self._buffer = None self.finished = False self.sent_id_filter = None if sent_id_filter is not None: self.sent_id_filter = re.compile(str(sent_id_filter)) logging.debug('Using sent_id_filter=%s', sent_id_filter) self.split_docs = split_docs self.ignore_sent_id = ignore_sent_id self.merge = merge self.max_docs = max_docs self._docs_loaded = 0 # `global.Entity` is a header stored in a comment before the first tree of each document in a given CoNLL-U file. # In Udapi, it is stored in `document.meta['global.Entity']`, but for technical reasons, we need to temporarily store it in here, the reader. # The reason is that `read.Conllu` uses a fast loading interface with `read_trees()`, # which reads all the trees in a file at once, but it does not have access to the document instance, # it just returns a sequence of trees (which may be split into multiple documents if `bundles_per_doc` is set). # So `read.Conllu` cannot store the `global.Entity` in `document.meta['global.Entity']` where it belongs. self._global_entity = None
[docs] @staticmethod def is_multizone_reader(): """Can this reader read bundles which contain more zones?. This implementation returns always True. If a subclass supports just one zone in file (e.g. `read.Sentences`), this method should be overriden to return False, so `process_document` can take advatage of this knowledge and optimize the reading (no buffer needed even if `bundles_per_doc` specified). """ return True
@property def filehandle(self): """Property with the current file handle.""" return self.files.filehandle @property def filename(self): """Property with the current filename.""" return self.files.filename @property def file_number(self): """Property with the current file number (1-based).""" return self.files.file_number
[docs] def next_filehandle(self): """Go to the next file and retrun its filehandle.""" return self.files.next_filehandle()
[docs] def read_tree(self): """Load one (more) tree from self.filehandle and return its root. This method must be overriden in all readers. Usually it is the only method that needs to be implemented. The implementation in this base clases raises `NotImplementedError`. """ raise NotImplementedError("Class %s doesn't implement read_tree" % self.__class__.__name__)
[docs] def read_trees(self): """Load all trees from self.filehandle and return a list of their roots. This method may be overriden in a reader if a faster alternative to read_tree() is needed. The implementation in this base clases raises `NotImplementedError`. """ raise NotImplementedError("Class %s doesn't implement read_trees" % self.__class__.__name__)
[docs] def filtered_read_tree(self): """Load and return one more tree matching the `sent_id_filter`. This method uses `read_tree()` internally. This is the method called by `process_document`. """ tree = self.read_tree() if self.sent_id_filter is None: return tree skipped_newdoc = None while True: if tree is None: return None if self.sent_id_filter.match(tree.sent_id) is not None: if skipped_newdoc and not tree.newdoc: tree.newdoc = skipped_newdoc return tree logging.debug('Skipping sentence %s as it does not match the sent_id_filter %s.', tree.sent_id, self.sent_id_filter) if tree.newdoc: skipped_newdoc = tree.newdoc tree = self.read_tree()
[docs] def try_fast_load(self, document): """Try to use self.read_trees() if possible and return True, otherwise False.""" if document.bundles or self.bundles_per_doc or self.sent_id_filter or self.split_docs: return False if self.filehandle is None: filehandle = self.next_filehandle() if filehandle is None: self.finished = True return True while True: try: trees = self.read_trees() except NotImplementedError: return False document.meta['loaded_from'] = self.filename document.meta['global.Entity'] = self._global_entity if trees and trees[0].newdoc and trees[0].newdoc is not True: document.meta["docname"] = trees[0].newdoc bundle, last_bundle_id = None, '' for root in trees: if root is None: continue if root.newdoc: if self.max_docs and self._docs_loaded >= self.max_docs: self.finished = True return True self._docs_loaded += 1 add_to_the_last_bundle = False if self.ignore_sent_id: root._sent_id = None elif root._sent_id is not None: parts = root._sent_id.split('/', 1) bundle_id = parts[0] if len(parts) == 2: root.zone = parts[1] add_to_the_last_bundle = bundle_id == last_bundle_id last_bundle_id = bundle_id if self.zone != 'keep': root.zone = self.zone # assign new/next bundle to `bundle` if needed if not bundle or not add_to_the_last_bundle: bundle = document.create_bundle() if last_bundle_id != '': bundle.bundle_id = last_bundle_id bundle.add_tree(root) self.next_filehandle() if self.filehandle is None: self.finished = True return True if not self.merge: return True return True
# pylint: disable=too-many-branches,too-many-statements # Maybe the code could be refactored, but it is speed-critical, # so benchmarking is needed because calling extra methods may result in slowdown.
[docs] def process_document(self, document): # Temporarily disabling garbage collection makes the loading much faster. gc_was_enabled = gc.isenabled() gc.disable() try: if self.try_fast_load(document): return orig_bundles = document.bundles[:] bundle, last_bundle_id = None, '' # There may be a tree left in the buffer when reading the last doc. if self._buffer: root = self._buffer self._buffer = None if orig_bundles: bundle = orig_bundles.pop(0) else: bundle = document.create_bundle() if root._sent_id is not None: bundle.bundle_id = root._sent_id.split('/', 1)[0] bundle.add_tree(root) if root.newdoc: self._docs_loaded += 1 if root.newdoc is not True: document.meta["docname"] = root.newdoc document.meta['global.Entity'] = self._global_entity document.meta['loaded_from'] = self.filename filehandle = self.filehandle if filehandle is None: filehandle = self.next_filehandle() if filehandle is None: self.finished = True return trees_loaded = 0 while True: root = self.filtered_read_tree() if root is None: if (trees_loaded == 0 or self.merge) and self.files.has_next_file(): filehandle = self.next_filehandle() continue self.finished = not self.files.has_next_file() break if trees_loaded == 0: document.meta['loaded_from'] = self.filename document.meta['global.Entity'] = self._global_entity # Parameter max_docs is primarily aimed for counting UD docs, ie. trees with newdoc. # However, it could be useful even when working with files without the newdoc annotations, # e.g. when using files='!*.conllu' or bundles_per_doc, in which case we count the Udapi documents # so even if the first tree in udapi.Document does not have newdoc, we count it as a new document. # The cases where newdoc is used are checked further below. if not root.newdoc: if self.max_docs and self._docs_loaded >= self.max_docs: self.finished = True return self._docs_loaded += 1 add_to_the_last_bundle = False trees_loaded += 1 if self.ignore_sent_id: root._sent_id = None elif root._sent_id is not None: parts = root._sent_id.split('/', 1) bundle_id = parts[0] if len(parts) == 2: root.zone = parts[1] add_to_the_last_bundle = bundle_id == last_bundle_id last_bundle_id = bundle_id if self.zone != 'keep': root.zone = self.zone # The `# newdoc` comment in CoNLL-U marks a start of a new document. if root.newdoc: if self.max_docs and self._docs_loaded >= self.max_docs: self.finished = True return if not bundle and root.newdoc is not True: document.meta["docname"] = root.newdoc if bundle and self.split_docs: self._buffer = root if orig_bundles: logging.warning("split_docs=1 but the doc had contained %d bundles", len(orig_bundles)) self.finished = False return self._docs_loaded += 1 # assign new/next bundle to `bundle` if needed if not bundle or not add_to_the_last_bundle: if self.bundles_per_doc and bundle and self.bundles_per_doc == bundle.number: self._buffer = root if orig_bundles: logging.warning("bundles_per_doc=%d but the doc had contained %d bundles", self.bundles_per_doc, len(orig_bundles)) return if orig_bundles: bundle = orig_bundles.pop(0) if last_bundle_id and last_bundle_id != bundle.bundle_id: logging.warning('Mismatch in bundle IDs: %s vs %s. Keeping the former one.', bundle.bundle_id, last_bundle_id) else: bundle = document.create_bundle() if last_bundle_id != '': bundle.bundle_id = last_bundle_id bundle.add_tree(root) # If bundles_per_doc is set and we have read the specified number of bundles, # we should end the current document and return. # However, if the reader supports reading multiple zones, we can never know # if the current bundle has ended or there will be another tree for this bundle. # So in case of multizone readers we need to read one extra tree # and store it in the buffer (and include it into the next document). if self.bundles_per_doc and self.bundles_per_doc == bundle.number \ and not self.is_multizone_reader(): return # Running garbage collector now takes about 0.36s for a 720k-words (68MiB) conllu file # but it makes further processing (where new objects are created) much faster, # e.g. 0.85s when creating 65k new nodes. # If garbage collection was already disabled (e.g. in udapy), everything is even faster # (but no memory with cyclic references is ever freed before the process exits) # and in that case we don't want to enable gc here. finally: if gc_was_enabled: gc.enable() gc.collect()
[docs] def read_documents(self): """Load all documents of this reader and return them as a list.""" # udapi.core.document imports udapi.block.read.conllu because of doc.load_conllu(filename) # and udapi.block.read.conllu loads this module (udapi.core.basereader), # so we cannot load udapi.core.document at the beginning of this module. from udapi.core.document import Document docs = [] while not self.finished: doc = Document() self.apply_on_document(doc) docs.append(doc) return docs