Source code for udapi.core.run

"""Class Run parses a scenario and executes it."""
import logging

from udapi.core.document import Document
from udapi.block.read.conllu import Conllu


def _parse_block_name(block_name):
    """
    Obtain a path and the class name from the given block name.

    :return: A tuple path, class name.
    :rtype: tuple

    """
    # Obtain a list of names delimited by the '.' char.
    names = block_name.split('.')

    # Process trivial case with empty path.
    if len(names) == 1:
        return '', block_name

    # Return a path and the last name as a class name.
    return '.'.join(names[:-1]), names[-1]


def _parse_command_line_arguments(scenario):
    """
    Obtain a list of block names and a list of their arguments from the command line arguments list.

    :return: A tuple block names, block arguments.
    :rtype: tuple

    """
    block_names = []
    block_args = []

    number_of_blocks = 0
    for token in scenario:
        logging.debug("Token %s", token)

        # If there is no '=' character in the token, consider is as a block name.
        # Initialize the block arguments to an empty dict.
        if '=' not in token:
            logging.debug("- block name")
            block_names.append(token)
            block_args.append({})
            number_of_blocks += 1
            continue

        # Otherwise consider the token to be a block argument in the form
        # key=value.
        logging.debug("- argument")

        # The first '=' in the token separates name from value.
        # The value may contain other '=' characters (e.g. in util.Eval node='node.form = "_"').
        attribute_name, attribute_value = token.split('=', 1)
        if number_of_blocks == 0:
            raise RuntimeError(
                'Block attribute pair %r without a prior block name', token)

        # Put it as a new argument for the previous block
        if attribute_value.isdigit():
            attribute_value = int(attribute_value)
        block_args[-1][attribute_name] = attribute_value

    return block_names, block_args


def _blocks_in_a_package(package_name):
    import importlib.util, pkgutil

    if not importlib.util.find_spec(package_name):
        return []
    try:
        package = __import__(package_name, fromlist="dummy")
        submodule_names = [m.name for m in pkgutil.iter_modules(package.__path__)]
        pname = package_name
        if pname.startswith("udapi.block."):
            pname = pname[12:]
        blocks = []
        for sname in submodule_names:
            try: # ignore modules with compilation errors
                module =  __import__(f"{package_name}.{sname}", fromlist="dummy")
                bnames = [c for c in dir(module) if c.lower() == sname]
                if bnames:
                    blocks.append(f"{pname}.{bnames[0]}")
            except:
                pass
        return blocks
    except:
            return []

def _import_blocks(block_names, block_args):
    """
    Parse block names, import particular packages and call the constructor for each object.

    :param block_names: A list of block names to be created.
    :param block_args: A list of block arguments to be passed to block constructor.
    :return: A list of initialized objects.
    :rtype: list

    """
    blocks = []

    for (block_id, block_name) in enumerate(block_names):
        # Importing module dynamically.
        sub_path, class_name = _parse_block_name(block_name)

        if block_name.startswith('.'):
            # Private modules are recognized by a dot at the beginning
            module = block_name.lower()[1:]
        else:
            module = "udapi.block." + sub_path + "." + class_name.lower()
        try:
            command = "from " + module + " import " + class_name + " as b" + str(block_id)
            logging.debug("Trying to run command: %s", command)
            exec(command)  # pylint: disable=exec-used
        except ModuleNotFoundError as err:
            package_name = ".".join(module.split(".")[:-1])
            package_blocks = _blocks_in_a_package(package_name)
            if not package_blocks:
                raise
            raise ModuleNotFoundError(
                f"Cannot find block {block_name} (i.e. class {module}.{class_name})\n"
                f"Available block in {package_name} are:\n"
                + "\n".join(package_blocks)) from err
        except Exception as ex:
            logging.warning(f"Cannot import block {block_name} (i.e. class {module}.{class_name})")
            raise

        # Run the imported module.
        kwargs = block_args[block_id]  # pylint: disable=unused-variable
        command = "b%s(**kwargs)" % block_id
        logging.debug("Trying to evaluate this: %s", command)
        new_block_instance = eval(command)  # pylint: disable=eval-used
        blocks.append((block_name, new_block_instance))

    return blocks


[docs] class Run(object): """Processing unit that processes UD data; typically a sequence of blocks.""" def __init__(self, args): """Initialization of the runner object. :param args: command line args as processed by argparser. """ self.args = args if not isinstance(args.scenario, list): raise TypeError( 'Expected scenario as list, obtained a %r', args.scenario) if len(args.scenario) < 1: raise ValueError('Empty scenario')
[docs] def execute(self): """Parse given scenario and execute it.""" # Parse the given scenario from the command line. block_names, block_args = _parse_command_line_arguments(self.args.scenario) # Import blocks (classes) and construct block instances. blocks = _import_blocks(block_names, block_args) # Initialize blocks (process_start). for bname, block in blocks: block.process_start() readers = [] for bname, block in blocks: try: block.finished # pylint: disable=pointless-statement readers.append(block) except AttributeError: pass if not readers: logging.info('No reader specified, using read.Conllu') conllu_reader = Conllu() readers = [conllu_reader] blocks = [('read.Conllu', conllu_reader)] + blocks # Apply blocks on the data. finished = False while not finished: document = Document() logging.info(" ---- ROUND ----") for bname, block in blocks: logging.info(f"Executing block {bname}") block.apply_on_document(document) finished = True for reader in readers: finished = finished and reader.finished # 6. close blocks (process_end) for bname, block in blocks: block.process_end()
# TODO: better implementation, included Scen
[docs] def scenario_string(self): """Return the scenario string.""" return "\n".join(self.args.scenario)
[docs] def create_block(block, **kwargs): """A factory function for creating new block instances (handy for IPython).""" blocks = _import_blocks([block], [kwargs]) return blocks[0][1]