Source code for procars.step_modules.compute_pqtree

# -*- coding: utf-8 -*-:
"""
Copyright © Bonsai - LIFL (Université Lille 1, CNRS UMR 8022) and Inria-Lille Nord Europe

contact: aida.ouangraoua@inria.fr, amandine.perrin@inria.fr

This software is a computer program whose purpose is to progressively reconstruct ancestral
gene orders.

This software is governed by the CeCILL-B license under French law and
abiding by the rules of distribution of free software. You can use,
modify and/or redistribute the software under the terms of the CeCILL-B
license as circulated by CEA, CNRS and Inria at the following URL
http://www.cecill.info, or in the LICENCE file at the root directory of this program.

The fact that you are presently reading this means that you have had
knowledge of the CeCILL-B license and that you accept its terms.

---------------------------------------------------

``compute_pqtree`` **module description**:

This module creates a new CAR file, from a number of blocks and a set of adjacencies
between those blocks.
The CAR file has the following format::

    #CAR1
    _Q bloc1 bloc2 bloc3 ... blocn Q_
    #CAR2
    _Q bloc1 -bloc2 ... Q_
    ...

.. moduleauthor:: Aïda Ouangraoua, Amandine PERRIN

February 2014

"""


from procars.utils import utils_IO


[docs]def read_from_block(signed_block, left, right, end): """ From a signed block, and a list of all blocks neighbors, finds a chain of blocks forming a CAR from the given block towards the given *end* (left or right) Parameters ---------- signed_block : int ID of the given block, with its orientation (sign) left : dict Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of *block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated according to the current step adjacency file. If no left block: 0 right : dict Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks calculated according to the current step adjacency file. If no right block: 0 end : int ``0`` for left extremity of the block, ``1`` for right extremity Returns ------- list *car:* ordered list of signed blocks in the required CAR or empty list if cycle CAR """ # if signed_block = 0 (does not exist), return empty car if signed_block == 0: return [] # else, run all method: # Intialization: current_signed_block = signed_block car = [signed_block] # if left end, the next block to find is on the left (next = left) # and the previous one (used for blocks in negative orientation: next(-1) = -previous(1)) # is on the right (previous = right) if end == 0: nex = left previous = right # if right end, contrary. else: nex = right previous = left # if block_num is positive, find next block[block_num] if(signed_block > 0): current_signed_block = nex[signed_block] # else, find previous block[-block_num] else: # /!\ -right[-signed_block] or -right[signed_block] ? current_signed_block = -previous[-signed_block] # Find all next blocks until block '0' (no more next block) or until a cycle # (current_signed_block == signed_block) while (current_signed_block != 0 and current_signed_block != signed_block): if end == 0: # left end, add new block at the beginning of the car car = [current_signed_block] + car else: # right end, add the new block at the end of the car car += [current_signed_block] if(current_signed_block > 0): current_signed_block = nex[current_signed_block] else: current_signed_block = -previous[-current_signed_block] # If car is a cycle, return an empty car if(current_signed_block == signed_block): car = [] return car
[docs]def init_read_from_block_in_cycle(signed_block, nex_signed_block, next_prev_signed_block, next_prev_discarded_signed_block, end): """ Initialization of the resolution of cycle CARs Checks if the block on the *end* extremity of the given *signed_block* was also found at last step or not. - if it was found and added at last step, keep it - if it was found but discarded in last step, keep it, but also store it in *to_be_discarded_unresolved* in order to remove it if the cycle is still unresolved - if it was not found at last step, remove it: store it to *to_be_discarded_resolved* Parameters ---------- signed_block : int Signed block for which we are looking for the adjacency with its next block. nex_signed_block : int Next block of the given *signed_block*. - If we look on the left end and *signed_block* > 0, *nex_signed_block* = previous[signed_block] - If we look on the left end and *signed_block* < 0, *nex_signed_block* = -next[-signed_block] - If we look on the right end and *signed_block* > 0, *nex_signed_block* = next[signed_block] - If we look on the right end and *signed_block* < 0, *nex_signed_block* = -previous[-signed_block] next_prev_signed_block : int Same as *nex_signed_block* but for adjacencies added at last step next_prev_discarded_signed_block : int Same as *nex_signed_block* but for adjacencies discarded at last step end : int ``0`` for left end, ``1`` for right end Returns ------- tuple *to_be_discarded_unresolved:* List containing adjacencies that we add to the current CAR because they were discarded at the previous step, but that we will remove if we find a cycle CAR again (unresolved cycle) *to_be_discarded_resolved:* List of adjacencies found at this step but not at last step, and hence to remove from the adjacency file *current_signed_block:* New current signed block (next block of *signed_block*) if the adjacency was found in last step, or 0 if it was not. """ to_be_discarded_unresolved = [] to_be_discarded_resolved = [] # If adj (between current block and a next one) was present in last step, just add it if(nex_signed_block == next_prev_signed_block): current_signed_block = nex_signed_block # if adj was discarded in last step, add it, but also store it in # 'to_be_discarded_unresolved' in case of unresolved cycle. elif(nex_signed_block in next_prev_discarded_signed_block or -nex_signed_block in next_prev_discarded_signed_block): current_signed_block = nex_signed_block if end == 0: to_be_discarded_unresolved.append([nex_signed_block, signed_block]) else: to_be_discarded_unresolved.append([signed_block, nex_signed_block]) # if next block of current one was not found at last step, add adj between current block # and next one to 'to_be_discarded_resolved'. else: current_signed_block = 0 if end == 0: to_be_discarded_resolved.append([nex_signed_block, signed_block]) else: to_be_discarded_resolved.append([signed_block, nex_signed_block]) return to_be_discarded_unresolved, to_be_discarded_resolved, current_signed_block
[docs]def read_from_block_in_cycle(signed_block, left, right, left_prev, right_prev, left_prev_discarded, right_prev_discarded, end): """ Finds a non-cyclic CAR From a signed_block, which was found to be in a cycle CAR if we added all adjacencies found at the current step, it tries to find a non cycle CAR by requiring a new condition: add neighbor in the CAR if it was also found as a conserved adjacency at last step (in added adjacencies or discarded ones). Parameters ---------- signed_block : int Number of the given block, with its orientation (sign) left : dict Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of *block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated according to the current step adjacency file. If no left block: 0 right : dict Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks calculated according to the current step adjacency file. If no right block: 0 left_prev : dict Same as left, but for adjacencies found in last step right_prev : dict Same as right, but for adjacencies found in last step left_prev_discarded : dict For each block number, an array containing its potential left neighbors: left[bloc1] = [bloc2, -bloc3,..]. According to adjacencies discarded at the last step right_prev_discarded : dict For each block number, an array containing its potential right neighbors: right[bloc1] = [-bloc2, bloc3,..]. According to adjacencies discarded at the last step end : int ``0`` for left extremity of the block, ``1`` for right extremity Returns ------- list *car:* ordered list of signed blocks in the required CAR. If the cycle is unresolved, it returns the cycle CAR. Else, it returns the part of the CAR from the block *signed_block* until the *end* extremity of the CAR. *to_be_discarded:* adjacencies to remove from the current set of adjacencies, because of the resolution of the cycle CAR. """ # if signed_block = 0 (does not exist), return empty car if signed_block == 0: return [] # else, run all method: # Intialization: current_signed_block = signed_block car = [signed_block] # if left end, the next block to find is on the left (next = left) # and the previous one (used for blocks in negative orientation: next(-1) = -previous(1)) # is on the right (previous = right) if end == 0: nex = left next_prev = left_prev next_prev_discarded = left_prev_discarded previous = right previous_prev = right_prev previous_prev_discarded = right_prev_discarded # if right end, contrary. else: nex = right next_prev = right_prev next_prev_discarded = right_prev_discarded previous = left previous_prev = left_prev previous_prev_discarded = left_prev_discarded # ### Find CAR # adjacencies to remove from the adjacency file to_be_discarded = [] # Initialization: if signed_block > 0: res = init_read_from_block_in_cycle(signed_block, nex[signed_block], next_prev[signed_block], next_prev_discarded[signed_block], end) else: res = init_read_from_block_in_cycle(signed_block, -previous[-current_signed_block], -previous_prev[-current_signed_block], previous_prev_discarded[-current_signed_block], end) to_be_discarded_unresolved, to_be_discarded_resolved, current_signed_block = res # Loop, until no more adjacency is found to complete the CAR, or until a new cycle CAR. while (current_signed_block != 0 and current_signed_block != signed_block): if end == 0: car = [current_signed_block] + car else: car += [current_signed_block] if(current_signed_block > 0): if(nex[current_signed_block] == next_prev[current_signed_block]): current_signed_block = nex[current_signed_block] elif(nex[current_signed_block] in next_prev_discarded[current_signed_block]): if end == 0: to_be_discarded_unresolved.append([nex[current_signed_block], current_signed_block]) else: to_be_discarded_unresolved.append([current_signed_block, nex[current_signed_block]]) current_signed_block = nex[current_signed_block] else: if end == 0: to_be_discarded_resolved.append([nex[current_signed_block], current_signed_block]) else: to_be_discarded_resolved.append([current_signed_block, nex[current_signed_block]]) current_signed_block = 0 else: if(previous[-current_signed_block] == previous_prev[-current_signed_block]): current_signed_block = -previous[-current_signed_block] elif(previous[-current_signed_block] in previous_prev_discarded[-current_signed_block]): if end == 0: to_be_discarded_unresolved.append([-previous[-current_signed_block], current_signed_block]) else: to_be_discarded_unresolved.append([current_signed_block, -previous[-current_signed_block]]) current_signed_block = -previous[-current_signed_block] else: if end == 0: to_be_discarded_resolved.append([-previous[-current_signed_block], current_signed_block]) else: to_be_discarded_resolved.append([current_signed_block, -previous[-current_signed_block]]) current_signed_block = 0 if(current_signed_block != signed_block): print("...cycle resolved") to_be_discarded = to_be_discarded_resolved else: print("...cycle unresolved") if end == 0: car = [current_signed_block] + car else: car += [current_signed_block] to_be_discarded = to_be_discarded_unresolved return car, to_be_discarded
[docs]def remove_adjacency_from_file(adjacency_file_name, adjacency_list): """ Removes an adjacency in an adjacency file When all conserved non-conflicting adjacencies are found, this module computes the sets of blocks into CARs. If adjacencies added infer a cycle CAR, we eliminate one of these adjacencies (the less reliable) in order to form a linear chromosome. The removed adjacency must then be removed from the adjacency file. Parameters ---------- adjacency_file_name : string Name of the current step adjacency file adjacency_list : list List of lists: adjacencies to remove from the file. """ print("removing adjacencies " + str(adjacency_list) + " from the set") with open(adjacency_file_name, "r") as adjs: adj_file_lines = adjs.readlines() with open(adjacency_file_name, "w") as adj_file_output: for line in adj_file_lines: words = line.split() # if current line adjacency is not the given one, write it if([int(words[0]), int(words[1])] not in adjacency_list and [-int(words[1]), -int(words[0])] not in adjacency_list): adj_file_output.write(line)
[docs]def find_cars(nb_blocks, left, right, adjacency_prefix, step_nb): """ From the list of added adjacencies, finds all CARs to put into the PQtree file Parameters ---------- nb_blocks : int Total number of blocks left : dict Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of *block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated according to the current step adjacency file. If no left block: 0 right : dict Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks calculated according to the current step adjacency file. If no right block: 0 adjacency_prefix : string Prefix of all adjacencies files step_nb : int Current step of ProCARs method Returns ------- list *genome:* List of lists, such that: all_cars[car_num] = [bloc1, bloc2, ...] = all ordered signed blocks of car number *car_num* """ adjacency_file_name = adjacency_prefix + "_" + str(step_nb) + ".txt" # already_read: array storing blocks already read during the computation of cars: already_read = [] genome = [] for block_id in range(1, nb_blocks + 1): if block_id not in already_read: car_left = read_from_block(block_id, left, right, 0) car_right = read_from_block(block_id, left, right, 1) car = car_left[: -1] + car_right # If we found a cycle (car = []) if len(car) == 0: print("cycle car containing block " + str(block_id)) adjacency_file_prev = adjacency_prefix + "_" + str(step_nb - 1) + ".txt" adjacency_file_prev_discarded = adjacency_file_prev + "_discarded.txt" left_prev, right_prev = utils_IO.read_adjacency_file(adjacency_file_prev, nb_blocks) discard = utils_IO.read_adjacency_file(adjacency_file_prev_discarded, nb_blocks, discarded=True) left_prev_discarded, right_prev_discarded = discard res_left = read_from_block_in_cycle(block_id, left, right, left_prev, right_prev, left_prev_discarded, right_prev_discarded, 0) car_left, to_be_discarded_left = res_left res_right = read_from_block_in_cycle(block_id, left, right, left_prev, right_prev, left_prev_discarded, right_prev_discarded, 1) car_right, to_be_discarded_right = res_right # if cycle was resolved: if(car_left[0] != car_left[-1] or len(car_left) <= 1): car = car_left[: -1] + car_right # remove adjacency not added (because it creates a cycle) remove_adjacency_from_file(adjacency_file_name, to_be_discarded_left + to_be_discarded_right) for signed_block in car: already_read.append(abs(signed_block)) genome.append(car) # cycle unresolved : car_left[0] = car_left[-1] else: car_left = car_left[: -1] car = [] previous_signed_block = 0 for signed_block in car_left: if([previous_signed_block, signed_block] not in to_be_discarded_left): car.append(signed_block) else: for signed_block1 in car: already_read.append(abs(signed_block1)) genome.append(car) car = [signed_block] previous_signed_block = signed_block for signed_block in car: already_read.append(abs(signed_block)) genome.append(car) remove_adjacency_from_file(adjacency_file_name, to_be_discarded_left) else: for signed_block in car: already_read.append(abs(signed_block)) genome.append(car) return genome
[docs]def main(nb_blocks, adjacency_prefix, car_prefix, step_nb): """ Main method, to create the new PQtree file. Parameters ---------- nb_blocks : int Total number of blocks adjacency_prefix : string Prefix of the adjacency file name car_prefix : string Prefix of the PQtree file name step_nb : int Current step of the ProCARs method """ # Store file names, and read adjacency file adjacency_file_name = adjacency_prefix + "_" + str(step_nb) + ".txt" car_file_name = car_prefix + "_" + str(step_nb) + ".txt" left, right = utils_IO.read_adjacency_file(adjacency_file_name, nb_blocks) # Find sets of CARs genome = find_cars(nb_blocks, left, right, adjacency_prefix, step_nb) # Write CARs into the file utils_IO.write_car_file(car_file_name, genome) print("... " + str(len(genome)) + " cars\n")