Source code for drug2ways.graph_traversal

# -*- coding: utf-8 -*-

"""Graph traversal methods."""

from typing import Any, Dict, Iterable, Tuple, List, Union

from networkx import DiGraph

__all__ = [
    'compute_all_paths_multitarget',
    'compute_all_paths_multitarget_dict',
]


def _compute_all_paths_multitarget(graph, source, targets, history, lmax):
    """Compute all paths and store history for each visited node containing number of paths already found.

    :param graph: graph
    :param source: source node
    :param targets: target nodes
    :param history:
    :param lmax:
    :return:
    """
    if lmax == 0:
        return

    paths = [[[0, 0] for _ in targets] for _ in range(lmax)]
    for neighbor in graph.neighbors(source):
        if neighbor in targets or (not targets and graph.nodes[neighbor]['isTarget']):
            target_id = targets.index(neighbor)
            if graph[source][neighbor]['polarity'] == 1:
                paths[0][target_id][0] += 1
            else:
                paths[0][target_id][1] += 1

            continue

        if neighbor not in history or neighbor in history and len(history[neighbor]) < lmax - 1:
            _compute_all_paths_multitarget(graph, neighbor, targets, history, lmax - 1)

        # After a node has been visited,
        # history[neighbor] is a dictionary with
        # all paths found for lenghts < lmax.
        # Different keys in dict groups paths
        # by their size (in number of nodes).

        if neighbor in history:
            for length in range(len(history[neighbor][:lmax - 1])):
                if graph[source][neighbor]['polarity'] == 1:
                    for target_id in range(len(targets)):
                        paths[length + 1][target_id][0] += history[neighbor][length][target_id][0]
                        paths[length + 1][target_id][1] += history[neighbor][length][target_id][1]
                else:
                    for target_id in range(len(targets)):
                        paths[length + 1][target_id][0] += history[neighbor][length][target_id][1]
                        paths[length + 1][target_id][1] += history[neighbor][length][target_id][0]

    history[source] = paths


def _compute_all_paths_multitarget_dict(graph, source, targets, history, lmax):
    """Compute all paths and store history for each visited node containing number of paths already found.

    :param graph: graph
    :param source: source node
    :param targets: target nodes
    :param lmax: lmax
    :param history:
    """
    if lmax == 0:
        return

    paths = [{} for _ in range(lmax)]
    for neighbor in graph.neighbors(source):
        if graph.nodes[neighbor]['isTarget']:
            if neighbor in targets or not targets:
                paths[0][neighbor] = [0, 0]
                if graph[source][neighbor]['polarity'] == 1:
                    paths[0][neighbor][0] += 1
                else:
                    paths[0][neighbor][1] += 1
            continue

        if neighbor not in history or neighbor in history and len(history[neighbor]) < lmax - 1:
            _compute_all_paths_multitarget_dict(graph, neighbor, targets, history, lmax - 1)

        # After a node has been visited,
        # history[neighbor] is a dictionary with
        # all paths found for lenghts < lmax.
        # Different keys in dict groups paths
        # by their size (in number of nodes).

        if neighbor in history:
            for length in range(len(history[neighbor][:lmax - 1])):
                relation_polarity = graph[source][neighbor]['polarity']
                for target in history[neighbor][length].keys():
                    if target not in paths[length + 1]:
                        paths[length + 1][target] = [0, 0]
                    if relation_polarity == 1:
                        paths[length + 1][target][0] += history[neighbor][length][target][0]
                        paths[length + 1][target][1] += history[neighbor][length][target][1]
                    else:
                        paths[length + 1][target][0] += history[neighbor][length][target][1]
                        paths[length + 1][target][1] += history[neighbor][length][target][0]

    history[source] = paths


def get_paths_to_target(graph, source, target, visited_nodes, history, lmax):
    """Return all paths from source to target avoiding nodes in visitied_nodes.

    Paths are returned as a tuple with
        (1) a pair of integers with the total number of paths source to target
        (2) a dictionary of all nodes and how many nodes cross through them
    """
    if lmax == 0:
        if graph[source][target]['polarity'] == 1:
            return [1, 0], {}
        else:
            return [0, 1], {}

    # pre conditions:
    # 1. len(history[source]) is at least lmax
    # 2. target in history[source][lmax]

    valid_neighbors = []
    for node in history[source][lmax][target][1]:
        if graph.has_edge(source, node):
            valid_neighbors.append(node)

    paths = [0, 0]
    node_count = {}
    for neighbor in valid_neighbors:
        # 1. len(history[neighbor]) is at least lmax-1
        if len(history[neighbor]) < lmax - 1:
            continue
        # 2. target in history[neighbor][lmax - 1]
        if target not in history[neighbor][lmax - 1]:
            continue

        # if neighbor is in list of visited_nodes, we're not interested in its paths.
        if neighbor in visited_nodes:
            continue

        # if neighbor has paths involving any of the visited_nodes,
        # recursive call will return w/o them.
        nodes_in_subpath = [node for node in history[neighbor][lmax - 1][target][1].keys()]
        if any(node in visited_nodes + [source] for node in nodes_in_subpath):
            neighbor_paths, neighbor_node_count = get_paths_to_target(
                graph, neighbor, target, visited_nodes + [source], history, lmax - 1)

        else:
            neighbor_paths = history[neighbor][lmax - 1][target][0]
            neighbor_node_count = history[neighbor][lmax - 1][target][1]

        relation_polarity = graph[source][neighbor]['polarity']
        activation_index = 0 if relation_polarity == 1 else 1
        inhibition_index = 1 if relation_polarity == 1 else 0

        for node, count in neighbor_node_count.items():
            if node not in node_count:
                node_count[node] = [0, 0]

            node_count[node][0] += count[activation_index]
            node_count[node][1] += count[inhibition_index]

        if sum(neighbor_paths):
            if neighbor not in node_count:
                node_count[neighbor] = [0, 0]
            node_count[neighbor][0] += neighbor_paths[activation_index]
            node_count[neighbor][1] += neighbor_paths[inhibition_index]
            paths[0] += neighbor_paths[activation_index]
            paths[1] += neighbor_paths[inhibition_index]

    return paths, node_count


def get_paths_through(graph, source, target, crossnode, visited_nodes, history, lmax):
    """Return all paths crossing crossnode.

    Paths are returned as a tuple with
        (1) a pair of integers with the total number of paths source to target.
        (2) a dictionary of all nodes and how many nodes cross through them

    If source == crossnode, call get_paths_to_target to get all paths from here.
    Otherwise, select neighbors that include crossnode in their history of length lmax - 1
    """
    # if source == crossnode, we need to get all paths but source won't be involved anymore,
    # therefore, we have to switch to get_paths_to_target function
    if source == crossnode:
        paths, node_count = get_paths_to_target(
            graph, source, target, visited_nodes + [source], history, lmax - 1
        )

        return paths, node_count

    valid_neighbors = []
    for node in history[source][lmax][target][1]:
        if graph.has_edge(source, node):
            valid_neighbors.append(node)

    paths = [0, 0]
    node_count = {}
    for neighbor in valid_neighbors:
        # 1. len(history[neighbor]) is at least lmax-1
        if len(history[neighbor]) < lmax - 1:
            continue
        # 2. target in history[neighbor][lmax - 1]
        if target not in history[neighbor][lmax - 1]:
            continue
        # 3. neighbor not in visited_nodes
        if neighbor in visited_nodes:
            continue

        elif neighbor == crossnode:
            neighbor_paths, neighbor_node_count = get_paths_to_target(
                graph, neighbor, target, visited_nodes + [source], history, lmax - 1)

        # crossnode must be in intermediate paths for neighbor to be considered
        elif crossnode not in history[neighbor][lmax - 1][target][1]:
            continue
        else:
            neighbor_paths, neighbor_node_count = get_paths_through(
                graph, neighbor, target, crossnode, visited_nodes + [source], history, lmax - 1)

        relation_polarity = graph[source][neighbor]['polarity']
        activation_index = 0 if relation_polarity == 1 else 1
        inhibition_index = 1 if relation_polarity == 1 else 0

        for node, count in neighbor_node_count.items():
            if node not in node_count:
                node_count[node] = [0, 0]

            node_count[node][0] += count[activation_index]
            node_count[node][1] += count[inhibition_index]

        if sum(neighbor_paths):
            if neighbor not in node_count:
                node_count[neighbor] = [0, 0]
            node_count[neighbor][0] += neighbor_paths[activation_index]
            node_count[neighbor][1] += neighbor_paths[inhibition_index]
            paths[0] += neighbor_paths[activation_index]
            paths[1] += neighbor_paths[inhibition_index]

    return paths, node_count


def _compute_all_simple_paths_multitarget_dict(graph, source, targets, lmax, history, cycle_history={}):
    """Compute all simple paths and store history for each visited node containing number of paths already found.

    :param graph: graph
    :param source: source node
    :param targets: target nodes
    :param lmax: maximum number of edges to find a path
    :param history: maximum number of edges to find a path
    :param cycle_history:
    :return:
    """
    if lmax == 0:
        return

    _history_source = [{} for _ in range(lmax)]
    neighbor_nodes = [neighbor for neighbor in graph.neighbors(source)]

    for neighbor in neighbor_nodes:
        if graph.nodes[neighbor]['isTarget']:
            if neighbor in targets or not targets:
                _history_source[0][neighbor] = [[0, 0], {}]
                if graph[source][neighbor]['polarity'] == 1:
                    _history_source[0][neighbor][0][0] = 1
                else:
                    _history_source[0][neighbor][0][1] = 1

            continue

        if neighbor not in history or (neighbor in history and len(history[neighbor]) < lmax - 1):
            _compute_all_simple_paths_multitarget_dict(
                graph=graph,
                source=neighbor,
                targets=targets,
                lmax=lmax - 1,
                history=history,
                cycle_history=cycle_history,
            )

        if neighbor in history:

            for length in range(len(history[neighbor][:lmax - 1])):

                relation_polarity = graph[source][neighbor]['polarity']
                for target in history[neighbor][length].keys():

                    if target not in _history_source[length + 1]:
                        _history_source[length + 1][target] = [[0, 0], {}]

                    activation_index = 0 if relation_polarity == 1 else 1
                    inhibition_index = 1 if relation_polarity == 1 else 0

                    # Get paths to target
                    paths_to_target = [history[neighbor][length][target][0][0], history[neighbor][length][target][0][1]]

                    # If there're no cycles, intermediate_nodes will not be modified
                    # and there's no need for a costly deepcopy operation
                    intermediate_nodes = history[neighbor][length][target][1]

                    # Get paths starting from neighbor that have source as intermediate node
                    paths_in_cycle = intermediate_nodes.get(source, [0, 0])

                    if paths_in_cycle[0] or paths_in_cycle[1]:
                        # Cycle detected.
                        # Find all paths / nodes involved in cycle so they can be removed
                        # from neighbor's list before being added to source's.
                        nodes_in_cycles = {}
                        number_of_paths_in_cycles = []
                        if (
                            source in cycle_history[0]
                            and neighbor in cycle_history[0][source]
                            and target in cycle_history[0][source][neighbor]
                            and length in cycle_history[0][source][neighbor][target]
                        ):
                            number_of_paths_in_cycles, nodes_in_cycles = cycle_history[0][source][neighbor][target][
                                length]

                        else:
                            number_of_paths_in_cycles, nodes_in_cycles = get_paths_through(
                                graph, neighbor, target, source, [], history, length,
                            )

                            if source not in cycle_history[0]:
                                cycle_history[0][source] = {}
                            if neighbor not in cycle_history[0][source]:
                                cycle_history[0][source][neighbor] = {}
                            if target not in cycle_history[0][source][neighbor]:
                                cycle_history[0][source][neighbor][target] = {}
                            if length not in cycle_history[0][source][neighbor][target]:
                                cycle_history[0][source][neighbor][target][length] = [number_of_paths_in_cycles,
                                                                                      nodes_in_cycles]

                        if target not in cycle_history[1]:
                            cycle_history[1][target] = 0
                        cycle_history[1][target] += 1

                        # Manually copy intermediate_nodes to avoid calling deepcopy
                        intermediate_nodes = {
                            node: [paths[0], paths[1]]
                            for node, paths in history[neighbor][length][target][1].items()
                        }

                        for node, paths in nodes_in_cycles.items():
                            intermediate_nodes[node][0] -= paths[0]
                            intermediate_nodes[node][1] -= paths[1]

                            if intermediate_nodes[node] == [0, 0]:
                                intermediate_nodes.pop(node, None)

                        paths_to_target[0] -= number_of_paths_in_cycles[0]
                        paths_to_target[1] -= number_of_paths_in_cycles[1]

                    # Copy map with path count per node
                    for node, paths in intermediate_nodes.items():
                        if node not in _history_source[length + 1][target][1]:
                            _history_source[length + 1][target][1][node] = [0, 0]

                        _history_source[length + 1][target][1][node][0] += paths[activation_index]
                        _history_source[length + 1][target][1][node][1] += paths[inhibition_index]

                    # Add neighbor to the intermediate node's list
                    if paths_to_target[0] or paths_to_target[1]:
                        _history_source[length + 1][target][1][neighbor] = _history_source[length + 1][target][1].get(
                            neighbor,
                            [0, 0],
                        )
                        _history_source[length + 1][target][1][neighbor][0] += paths_to_target[activation_index]
                        _history_source[length + 1][target][1][neighbor][1] += paths_to_target[inhibition_index]

                    # Update count of paths to target from source
                    _history_source[length + 1][target][0][0] += paths_to_target[activation_index]
                    _history_source[length + 1][target][0][1] += paths_to_target[inhibition_index]

    history[source] = _history_source


[docs]def compute_all_paths_multitarget( graph: DiGraph, source: Iterable[Any], targets, lmax: int, previous_history: Dict, ) -> Tuple[int, List[List[Union[float, int]]]]: """Compute all paths to all the targets, separately. :param graph: graph :param source: source nodes :param targets: target nodes :param lmax: lmax :param previous_history: previous history of visited nodes :return: """ _compute_all_paths_multitarget( graph=graph, source=source, targets=targets, history=previous_history, lmax=lmax, ) # Initialize [relative_activations, relative_inhibitions, number_of_paths_to_target] count = [[0.0, 0.0, 0] for _ in targets] total_number_of_paths = 0 # Loop up in the history the results in each of the paths for t in range(len(targets)): for length in range(lmax): source_to_target = previous_history[source][length][t] count[t][0] += source_to_target[0] count[t][1] += source_to_target[1] count[t][2] += source_to_target[0] + source_to_target[1] total_number_of_paths = _count_paths(count, t, total_number_of_paths) return total_number_of_paths, count
[docs]def compute_all_paths_multitarget_dict( graph: DiGraph, source, targets, lmax: int, previous_history: Dict, cycle_history: Union[Dict, Dict], simple_paths: bool = False, ) -> Tuple[int, List[List[Union[float, int]]]]: """Compute all paths to all the targets, separately. Uses dict to store target path count instead of array. :param graph: graph :param source: source node :param targets: target nodes :param lmax: lmax :param previous_history: history of visited nodes :param cycle_history: history of cycles :param simple_paths: simple paths mode """ if simple_paths: _compute_all_simple_paths_multitarget_dict( graph=graph, source=source, targets=targets, history=previous_history, lmax=lmax, cycle_history=cycle_history, ) else: _compute_all_paths_multitarget_dict( graph=graph, source=source, targets=targets, history=previous_history, lmax=lmax, ) # Initialize [relative_activations, relative_inhibitions, number_of_paths_to_target] count = [[0.0, 0.0, 0] for _ in targets] total_number_of_paths = 0 # Loop up in the history the results in each of the paths for t in range(len(targets)): target_id = targets[t] for length in range(lmax): if target_id not in previous_history[source][length]: continue source_to_target = previous_history[source][length][target_id] if simple_paths: source_to_target = source_to_target[0] count[t][0] += source_to_target[0] count[t][1] += source_to_target[1] count[t][2] += source_to_target[0] + source_to_target[1] total_number_of_paths = _count_paths(count, t, total_number_of_paths) return total_number_of_paths, count
def _count_paths(count, t, total_number_of_paths): """Count ratio and number of paths.""" total_number_of_paths += count[t][2] # Calculate ratio of positive and negative paths count[t][0] = 0 if not count[t][2] else round(count[t][0] / count[t][2], 2) count[t][1] = 0 if not count[t][2] else round(count[t][1] / count[t][2], 2) return total_number_of_paths