Source code for diempy.contigs

#likely need to adjust what needs being imported, 
#for local imports, check which functions are being used and whether there are better functions defined elsewhere

import numpy as np
import os
import pandas as pd
# np.set_printoptions(legacy = '1.25')

#import pandas as pd
#import csv
#import time
#import os
#import multiprocessing
# from collections import Counter
# import pickle

# from . import polarize as pol
# from . import smooth as ks

# left and right indices are inclusive. So interval is [l,r] inclusive of both ends.
def get_intervals(chrName, indName, statesList, posList, includeSingle = True):
    '''
    Function to get intervals for a given contig from states and positions. 
    single site intervals can be included or excluded. By default they are included.

    Args:

        chrName (str): Chromosome name.
        indName (str): Individual name.
        statesList (list): List of states at each site for this individual and chromosome.
        posList (list): List of positions.
        mapPosList (list): List of map positions.

    Returns:
        list: List of intervals.
    '''

    lidx = 0
    ridx = 0
    ivls = []

    if len(statesList) == 0:
        return ivls
    
    while ridx <= len(statesList)-1:
        currentState = statesList[lidx]

        if ridx == len(statesList)-1:
            l = posList[lidx]
            r = posList[ridx]
            iv = Interval(chrName, indName, lidx, ridx, l, r, currentState)

            if includeSingle == True:
                ivls.append(iv)
            else:
                if r-l>0: 
                    ivls.append(iv)
            break

        if statesList[ridx+1] == currentState:
            ridx += 1
        else:
            l = posList[lidx]
            r = posList[ridx]

            iv = Interval(chrName, indName, lidx, ridx, l, r, currentState)

            if includeSingle == True:
                ivls.append(iv)
            else:
                if r-l>0: 
                    ivls.append(iv)

            ridx +=1
            lidx=ridx
            
    return ivls




# interval should also add info about (any) sites supporting the interval in addition to the left and right sites defining it
# this should be done in a way to look for gene conversion events later on
[docs] class Interval: ''' Represents a genomic interval for a specific individual and chromosome. Args: chrName (str): Chromosome name. indName (str): Individual name. idxl (int): Left index (inclusive). idxr (int): Right index (inclusive). So slice of state matrix would be [idxl:idxr+1] l (float): Left position (physical). r (float): Right position (physical). state (int): State of the interval. :ivar str chrName: Chromosome name. :ivar str indName: Individual name. :ivar int idxl: Left index (inclusive). :ivar int idxr: Right index (inclusive). So slice of state matrix would be [idxl:idxr+1] :ivar float l: Left position (physical). :ivar float r: Right position (physical). :ivar int state: State of the interval. ''' def __init__(self,chrName,indName,idxl,idxr,l,r,state): self.chrName = chrName self.indName = indName self.idxl = idxl self.idxr = idxr self.l = l self.r = r self.state = state def info(self): print(f"chr = {self.chrName}, ind = {self.indName}, idxl = {self.idxl}, idxr = {self.idxr}, l = {self.l}, r = {self.r}, state = {self.state}") def span(self): return self.r - self.l def mapSpan(self,chrLength): return (self.r - self.l)/chrLength # in on a 'unit map length' scale, i.e. 0 to 1
#the chromosome class contains the haplotype structure of a single chromosome. # the individual and chromosome are indexedIt contains # for a single individual and single chromosome. Maybe 'individualChromsome' would be a better name? # the individual is simply a list of the intervals (as defined above)
[docs] class Contig: ''' Represents a contiguous sequence of genomic intervals for a specific individual and chromosome. Args: chrName (str): Chromosome name. indName (str): Individual name. intervalList (list): List of Interval objects. :ivar str chr: Chromosome name. :ivar str ind: Individual name. :ivar int num_intervals: Number of intervals. :ivar list intervals: List of Interval objects. ''' # individual is a list of intervals pertaining to a single chromosome def __init__(self,chrName=None,indName=None,intervalList=None): self.intervals = intervalList if self.intervals is None: self.num_intervals = 0 else: self.num_intervals = len(self.intervals) self.indName = indName self.chrName = chrName # self.getZeroIntervals() # self.getOneIntervals() # self.getTwoIntervals() # self.getThreeIntervals() def printIntervals(self,lim=10): print("formatting is as follows [leftPosition,rightPosition,state]") if lim is None: print([[x.l,x.r,x.state] for x in self.intervals]) else: print([[x.l,x.r,x.state] for x in self.intervals[0:min(self.num_intervals,lim)]]) def get_my_intervals_of_state(self,state): ivs = [] for x in self.intervals: if x.state == state: ivs.append(x) return ivs
def pack_interval(interval): ''' Packs an Interval object into a dictionary. Args: interval (Interval): Interval object to be packed. Returns: dict: Dictionary representation of the Interval object. ''' return interval.__dict__ def unpack_interval(d): ''' Unpacks a dictionary into an Interval object. Args: d (dict): Dictionary representation of an Interval object. Returns: Interval: Unpacked Interval object. ''' blankArgs = [None for _ in range(7)] i = Interval(*blankArgs) i.__dict__.update(d) return i def pack_intervalList(ivl): ''' Packs a list of Interval objects into a list of dictionaries. Args: ivl (list): List of Interval objects to be packed. Returns: list: List of dictionary representations of the Interval objects. ''' return [pack_interval(iv) for iv in ivl] def unpack_intervalList(dlist): ''' Unpacks a list of dictionaries into a list of Interval objects. Args: dlist (list): List of dictionary representations of Interval objects. Returns: list: List of unpacked Interval objects. ''' return [unpack_interval(d) for d in dlist] def pack_contig(contig): ''' Packs a Contig object into a dictionary. This requires 'packing' the list of Interval objects as well. Args: contig (Contig): Contig object to be packed. Returns: dict: Dictionary representation of the Contig object. ''' d = contig.__dict__.copy() d['intervals'] = pack_intervalList(contig.intervals) return d def unpack_contig(d): contig = Contig() contig.__dict__.update(d) contig.intervals = unpack_intervalList(d['intervals']) return contig def pack_contig_matrix(cArr): ''' Packs a Matrix of Contig objects into a matrix of dictionaries. Matrix is (num_chromosomes, num_individuals) in sort order of the diemtype parent object. Args: cArr (np.array dtype=object): Matrix of Contig objects to be packed. Returns: list: List of dictionary representations of the Contig objects. ''' return [[pack_contig(c) for c in row] for row in cArr] def unpack_contig_matrix(dArr): ''' Unpacks a Matrix of dictionaries into a matrix of Contig objects. Matrix is (num_chromosomes, num_individuals) in sort order of the diemtype parent object. Args: dArr (list): List of dictionary representations of Contig objects. Returns: np.array dtype=object: Matrix of unpacked Contig objects. ''' return np.array([[unpack_contig(d) for d in row] for row in dArr], dtype=object) def build_contig_matrix(diemType,includeSingle = True): ''' Creates a matrix of Contig objects from a DiemType object. Matrix is (num_chromosomes, num_individuals) in sort order of the diemtype parent object. Args: diemType (DiemType): DiemType object from which to create the Contig matrix. Returns: np.array dtype=object: Matrix of Contig objects. ''' nChrs = len(diemType.chrNames) nInds = len(diemType.indNames) cArr = np.empty((nChrs, nInds), dtype=object) for cIdx in range(nChrs): chrName = diemType.chrNames[cIdx] for indIdx in range(nInds): indName = diemType.indNames[indIdx] statesList = diemType.DMBC[cIdx][indIdx] posList = diemType.posByChr[cIdx] ivl = get_intervals(chrName, indName, statesList, posList, includeSingle=includeSingle) contig = Contig(chrName, indName, ivl) cArr[cIdx, indIdx] = contig return cArr
[docs] def export_contigs_to_ind_bed_files(diemType, outputDir): ''' Exports contig intervals to BED files for each individual. Args: diemType (DiemType): DiemType object containing contig data. outputDir (str): Directory where BED files will be saved. ''' nChrs = len(diemType.chrNames) nInds = len(diemType.indNames) # Ensure output directory exists os.makedirs(outputDir, exist_ok=True) for indIdx in range(nInds): indName = diemType.indNames[indIdx] bedFilePath = os.path.join(outputDir, f"{indName}_contigs.bed") data = [] with open(bedFilePath, 'w') as bedFile: for cIdx in range(nChrs): thisContig = diemType.contigMatrix[cIdx][indIdx] for ivl in thisContig.intervals: data.append([ivl.chrName, ivl.l-1, ivl.r-1, ivl.state]) # BED format is 0-based start, 1-based end dfInd = pd.DataFrame(data, columns=["chrom", "start", "end", "state"]) dfInd.to_csv(bedFilePath, sep="\t", header=True, index=False)
# This version does 'U' for state 0, and '0','1','2' for states 1,2,3 # def export_contigs_to_ind_bed_files(diemType, outputDir): # ''' # Exports contig intervals to BED files for each individual. # Args: # diemType (DiemType): DiemType object containing contig data. # outputDir (str): Directory where BED files will be saved. # ''' # nChrs = len(diemType.chrNames) # nInds = len(diemType.indNames) # # Ensure output directory exists # os.makedirs(outputDir, exist_ok=True) # for indIdx in range(nInds): # indName = diemType.indNames[indIdx] # bedFilePath = os.path.join(outputDir, f"{indName}_contigs.bed") # data = [] # with open(bedFilePath, 'w') as bedFile: # for cIdx in range(nChrs): # thisContig = diemType.contigMatrix[cIdx][indIdx] # # change diemtype state vaules of 0,1,2,3 to 'U', '0', '1', '2' # for ivl in thisContig.intervals: # state = ivl.state # if state == 0: # state = 'U' # else: # state = str(state - 1) # Convert 1,2,3 to '0','1','2' # data.append([ivl.chrName, ivl.l-1, ivl.r-1, state]) # BED format is 0-based start, 1-based end # dfInd = pd.DataFrame(data, columns=["chrom", "start", "end", "state"]) # dfInd.to_csv(bedFilePath, sep="\t", header=True, index=False)