# -*- coding: utf-8 -*-:
"""
Copyright © Bonsai - LIFL (Université Lille 1, CNRS UMR 8022) and Inria-Lille Nord Europe
contact: aida.ouangraoua@inria.fr, amandine.perrin@inria.fr
This software is a computer program whose purpose is to progressively reconstruct ancestral
gene orders.
This software is governed by the CeCILL-B license under French law and
abiding by the rules of distribution of free software. You can use,
modify and/or redistribute the software under the terms of the CeCILL-B
license as circulated by CEA, CNRS and Inria at the following URL
http://www.cecill.info, or in the LICENCE file at the root directory of this program.
The fact that you are presently reading this means that you have had
knowledge of the CeCILL-B license and that you accept its terms.
---------------------------------------------------
``compute_pqtree`` **module description**:
This module creates a new CAR file, from a number of blocks and a set of adjacencies
between those blocks.
The CAR file has the following format::
#CAR1
_Q bloc1 bloc2 bloc3 ... blocn Q_
#CAR2
_Q bloc1 -bloc2 ... Q_
...
.. moduleauthor:: Aïda Ouangraoua, Amandine PERRIN
February 2014
"""
from procars.utils import utils_IO
[docs]def read_from_block(signed_block, left, right, end):
""" From a signed block, and a list of all blocks neighbors, finds a chain of blocks
forming a CAR from the given block towards the given *end* (left or right)
Parameters
----------
signed_block : int
ID of the given block, with its orientation (sign)
left : dict
Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of
*block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated
according to the current step adjacency file. If no left block: 0
right : dict
Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end
of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks
calculated according to the current step adjacency file. If no right block: 0
end : int
``0`` for left extremity of the block, ``1`` for right extremity
Returns
-------
list
*car:* ordered list of signed blocks in the required CAR or empty list if cycle CAR
"""
# if signed_block = 0 (does not exist), return empty car
if signed_block == 0:
return []
# else, run all method:
# Intialization:
current_signed_block = signed_block
car = [signed_block]
# if left end, the next block to find is on the left (next = left)
# and the previous one (used for blocks in negative orientation: next(-1) = -previous(1))
# is on the right (previous = right)
if end == 0:
nex = left
previous = right
# if right end, contrary.
else:
nex = right
previous = left
# if block_num is positive, find next block[block_num]
if(signed_block > 0):
current_signed_block = nex[signed_block]
# else, find previous block[-block_num]
else:
# /!\ -right[-signed_block] or -right[signed_block] ?
current_signed_block = -previous[-signed_block]
# Find all next blocks until block '0' (no more next block) or until a cycle
# (current_signed_block == signed_block)
while (current_signed_block != 0 and current_signed_block != signed_block):
if end == 0:
# left end, add new block at the beginning of the car
car = [current_signed_block] + car
else:
# right end, add the new block at the end of the car
car += [current_signed_block]
if(current_signed_block > 0):
current_signed_block = nex[current_signed_block]
else:
current_signed_block = -previous[-current_signed_block]
# If car is a cycle, return an empty car
if(current_signed_block == signed_block):
car = []
return car
[docs]def init_read_from_block_in_cycle(signed_block, nex_signed_block, next_prev_signed_block,
next_prev_discarded_signed_block, end):
""" Initialization of the resolution of cycle CARs
Checks if the block on the *end* extremity of the given *signed_block* was also found at last
step or not.
- if it was found and added at last step, keep it
- if it was found but discarded in last step, keep it, but also store it in
*to_be_discarded_unresolved* in order to remove it if the cycle is still unresolved
- if it was not found at last step, remove it: store it to *to_be_discarded_resolved*
Parameters
----------
signed_block : int
Signed block for which we are looking for the adjacency with its next block.
nex_signed_block : int
Next block of the given *signed_block*.
- If we look on the left end and *signed_block* > 0, *nex_signed_block* =
previous[signed_block]
- If we look on the left end and *signed_block* < 0, *nex_signed_block* =
-next[-signed_block]
- If we look on the right end and *signed_block* > 0, *nex_signed_block* =
next[signed_block]
- If we look on the right end and *signed_block* < 0, *nex_signed_block* =
-previous[-signed_block]
next_prev_signed_block : int
Same as *nex_signed_block* but for adjacencies added at last step
next_prev_discarded_signed_block : int
Same as *nex_signed_block* but for adjacencies discarded at last step
end : int
``0`` for left end, ``1`` for right end
Returns
-------
tuple
*to_be_discarded_unresolved:* List containing adjacencies that we add to the current
CAR because they were discarded at the previous step, but that we will remove if we find
a cycle CAR again (unresolved cycle)
*to_be_discarded_resolved:* List of adjacencies found at this step but not at last step,
and hence to remove from the adjacency file
*current_signed_block:* New current signed block (next block of *signed_block*) if the
adjacency was found in last step, or 0 if it was not.
"""
to_be_discarded_unresolved = []
to_be_discarded_resolved = []
# If adj (between current block and a next one) was present in last step, just add it
if(nex_signed_block == next_prev_signed_block):
current_signed_block = nex_signed_block
# if adj was discarded in last step, add it, but also store it in
# 'to_be_discarded_unresolved' in case of unresolved cycle.
elif(nex_signed_block in next_prev_discarded_signed_block
or -nex_signed_block in next_prev_discarded_signed_block):
current_signed_block = nex_signed_block
if end == 0:
to_be_discarded_unresolved.append([nex_signed_block, signed_block])
else:
to_be_discarded_unresolved.append([signed_block, nex_signed_block])
# if next block of current one was not found at last step, add adj between current block
# and next one to 'to_be_discarded_resolved'.
else:
current_signed_block = 0
if end == 0:
to_be_discarded_resolved.append([nex_signed_block, signed_block])
else:
to_be_discarded_resolved.append([signed_block, nex_signed_block])
return to_be_discarded_unresolved, to_be_discarded_resolved, current_signed_block
[docs]def read_from_block_in_cycle(signed_block, left, right, left_prev, right_prev,
left_prev_discarded, right_prev_discarded, end):
""" Finds a non-cyclic CAR
From a signed_block, which was found to be in a cycle CAR if we added all adjacencies
found at the current step, it tries to find a non cycle CAR
by requiring a new condition: add neighbor in the CAR if it was also found as a conserved
adjacency at last step (in added adjacencies or discarded ones).
Parameters
----------
signed_block : int
Number of the given block, with its orientation (sign)
left : dict
Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of
*block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated
according to the current step adjacency file. If no left block: 0
right : dict
Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end
of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks
calculated according to the current step adjacency file. If no right block: 0
left_prev : dict
Same as left, but for adjacencies found in last step
right_prev : dict
Same as right, but for adjacencies found in last step
left_prev_discarded : dict
For each block number, an array containing its potential left neighbors:
left[bloc1] = [bloc2, -bloc3,..]. According to adjacencies discarded at the last step
right_prev_discarded : dict
For each block number, an array containing its potential right neighbors:
right[bloc1] = [-bloc2, bloc3,..]. According to adjacencies discarded at the last step
end : int
``0`` for left extremity of the block, ``1`` for right extremity
Returns
-------
list
*car:* ordered list of signed blocks in the required CAR. If the cycle is unresolved, it
returns the cycle CAR. Else, it returns the part of the CAR from the block *signed_block*
until the *end* extremity of the CAR.
*to_be_discarded:* adjacencies to remove from the current set of adjacencies, because of the
resolution of the cycle CAR.
"""
# if signed_block = 0 (does not exist), return empty car
if signed_block == 0:
return []
# else, run all method:
# Intialization:
current_signed_block = signed_block
car = [signed_block]
# if left end, the next block to find is on the left (next = left)
# and the previous one (used for blocks in negative orientation: next(-1) = -previous(1))
# is on the right (previous = right)
if end == 0:
nex = left
next_prev = left_prev
next_prev_discarded = left_prev_discarded
previous = right
previous_prev = right_prev
previous_prev_discarded = right_prev_discarded
# if right end, contrary.
else:
nex = right
next_prev = right_prev
next_prev_discarded = right_prev_discarded
previous = left
previous_prev = left_prev
previous_prev_discarded = left_prev_discarded
# ### Find CAR
# adjacencies to remove from the adjacency file
to_be_discarded = []
# Initialization:
if signed_block > 0:
res = init_read_from_block_in_cycle(signed_block, nex[signed_block],
next_prev[signed_block],
next_prev_discarded[signed_block], end)
else:
res = init_read_from_block_in_cycle(signed_block, -previous[-current_signed_block],
-previous_prev[-current_signed_block],
previous_prev_discarded[-current_signed_block], end)
to_be_discarded_unresolved, to_be_discarded_resolved, current_signed_block = res
# Loop, until no more adjacency is found to complete the CAR, or until a new cycle CAR.
while (current_signed_block != 0 and current_signed_block != signed_block):
if end == 0:
car = [current_signed_block] + car
else:
car += [current_signed_block]
if(current_signed_block > 0):
if(nex[current_signed_block] == next_prev[current_signed_block]):
current_signed_block = nex[current_signed_block]
elif(nex[current_signed_block] in next_prev_discarded[current_signed_block]):
if end == 0:
to_be_discarded_unresolved.append([nex[current_signed_block],
current_signed_block])
else:
to_be_discarded_unresolved.append([current_signed_block,
nex[current_signed_block]])
current_signed_block = nex[current_signed_block]
else:
if end == 0:
to_be_discarded_resolved.append([nex[current_signed_block],
current_signed_block])
else:
to_be_discarded_resolved.append([current_signed_block,
nex[current_signed_block]])
current_signed_block = 0
else:
if(previous[-current_signed_block] == previous_prev[-current_signed_block]):
current_signed_block = -previous[-current_signed_block]
elif(previous[-current_signed_block] in previous_prev_discarded[-current_signed_block]):
if end == 0:
to_be_discarded_unresolved.append([-previous[-current_signed_block],
current_signed_block])
else:
to_be_discarded_unresolved.append([current_signed_block,
-previous[-current_signed_block]])
current_signed_block = -previous[-current_signed_block]
else:
if end == 0:
to_be_discarded_resolved.append([-previous[-current_signed_block],
current_signed_block])
else:
to_be_discarded_resolved.append([current_signed_block,
-previous[-current_signed_block]])
current_signed_block = 0
if(current_signed_block != signed_block):
print("...cycle resolved")
to_be_discarded = to_be_discarded_resolved
else:
print("...cycle unresolved")
if end == 0:
car = [current_signed_block] + car
else:
car += [current_signed_block]
to_be_discarded = to_be_discarded_unresolved
return car, to_be_discarded
[docs]def remove_adjacency_from_file(adjacency_file_name, adjacency_list):
""" Removes an adjacency in an adjacency file
When all conserved non-conflicting adjacencies are found, this module computes the sets of
blocks into CARs. If adjacencies added infer a cycle CAR, we eliminate one of these
adjacencies (the less reliable) in order to form a linear chromosome. The removed adjacency
must then be removed from the adjacency file.
Parameters
----------
adjacency_file_name : string
Name of the current step adjacency file
adjacency_list : list
List of lists: adjacencies to remove from the file.
"""
print("removing adjacencies " + str(adjacency_list) + " from the set")
with open(adjacency_file_name, "r") as adjs:
adj_file_lines = adjs.readlines()
with open(adjacency_file_name, "w") as adj_file_output:
for line in adj_file_lines:
words = line.split()
# if current line adjacency is not the given one, write it
if([int(words[0]), int(words[1])] not in adjacency_list
and [-int(words[1]), -int(words[0])] not in adjacency_list):
adj_file_output.write(line)
[docs]def find_cars(nb_blocks, left, right, adjacency_prefix, step_nb):
""" From the list of added adjacencies, finds all CARs to put into the PQtree file
Parameters
----------
nb_blocks : int
Total number of blocks
left : dict
Dictionnary such that left[abs(block_num)] = signed block_number of block on the left end of
*block_num* (if *block_num* > 0: previous block, else : next block). Left blocks calculated
according to the current step adjacency file. If no left block: 0
right : dict
Dictionnary such that right[abs(block_num)] = signed block_number of block on the right end
of *block_num* (if *block_num* > 0: next block, else : previous block). Right blocks
calculated according to the current step adjacency file. If no right block: 0
adjacency_prefix : string
Prefix of all adjacencies files
step_nb : int
Current step of ProCARs method
Returns
-------
list
*genome:* List of lists, such that:
all_cars[car_num] = [bloc1, bloc2, ...] = all ordered signed blocks of car number *car_num*
"""
adjacency_file_name = adjacency_prefix + "_" + str(step_nb) + ".txt"
# already_read: array storing blocks already read during the computation of cars:
already_read = []
genome = []
for block_id in range(1, nb_blocks + 1):
if block_id not in already_read:
car_left = read_from_block(block_id, left, right, 0)
car_right = read_from_block(block_id, left, right, 1)
car = car_left[: -1] + car_right
# If we found a cycle (car = [])
if len(car) == 0:
print("cycle car containing block " + str(block_id))
adjacency_file_prev = adjacency_prefix + "_" + str(step_nb - 1) + ".txt"
adjacency_file_prev_discarded = adjacency_file_prev + "_discarded.txt"
left_prev, right_prev = utils_IO.read_adjacency_file(adjacency_file_prev,
nb_blocks)
discard = utils_IO.read_adjacency_file(adjacency_file_prev_discarded,
nb_blocks, discarded=True)
left_prev_discarded, right_prev_discarded = discard
res_left = read_from_block_in_cycle(block_id, left, right, left_prev, right_prev,
left_prev_discarded, right_prev_discarded, 0)
car_left, to_be_discarded_left = res_left
res_right = read_from_block_in_cycle(block_id, left, right, left_prev, right_prev,
left_prev_discarded, right_prev_discarded, 1)
car_right, to_be_discarded_right = res_right
# if cycle was resolved:
if(car_left[0] != car_left[-1] or len(car_left) <= 1):
car = car_left[: -1] + car_right
# remove adjacency not added (because it creates a cycle)
remove_adjacency_from_file(adjacency_file_name,
to_be_discarded_left + to_be_discarded_right)
for signed_block in car:
already_read.append(abs(signed_block))
genome.append(car)
# cycle unresolved : car_left[0] = car_left[-1]
else:
car_left = car_left[: -1]
car = []
previous_signed_block = 0
for signed_block in car_left:
if([previous_signed_block, signed_block] not in to_be_discarded_left):
car.append(signed_block)
else:
for signed_block1 in car:
already_read.append(abs(signed_block1))
genome.append(car)
car = [signed_block]
previous_signed_block = signed_block
for signed_block in car:
already_read.append(abs(signed_block))
genome.append(car)
remove_adjacency_from_file(adjacency_file_name, to_be_discarded_left)
else:
for signed_block in car:
already_read.append(abs(signed_block))
genome.append(car)
return genome
[docs]def main(nb_blocks, adjacency_prefix, car_prefix, step_nb):
"""
Main method, to create the new PQtree file.
Parameters
----------
nb_blocks : int
Total number of blocks
adjacency_prefix : string
Prefix of the adjacency file name
car_prefix : string
Prefix of the PQtree file name
step_nb : int
Current step of the ProCARs method
"""
# Store file names, and read adjacency file
adjacency_file_name = adjacency_prefix + "_" + str(step_nb) + ".txt"
car_file_name = car_prefix + "_" + str(step_nb) + ".txt"
left, right = utils_IO.read_adjacency_file(adjacency_file_name, nb_blocks)
# Find sets of CARs
genome = find_cars(nb_blocks, left, right, adjacency_prefix, step_nb)
# Write CARs into the file
utils_IO.write_car_file(car_file_name, genome)
print("... " + str(len(genome)) + " cars\n")