Source code for dynast.align

import os
import tempfile
from typing import Dict, List, Optional

import ngs_tools as ngs
from typing_extensions import Literal

from . import config, constants, utils
from .logging import logger
from .technology import Technology


[docs]def STAR_solo( fastqs: List[str], index_dir: str, out_dir: str, technology: Technology, whitelist_path: Optional[str] = None, strand: Literal['forward', 'reverse', 'unstranded'] = 'forward', n_threads: int = 8, temp_dir: Optional[str] = None, nasc: bool = False, overrides: Optional[Dict[str, str]] = None, ) -> Dict[str, str]: """Align FASTQs with STARsolo. Args: fastqs: List of path to FASTQs. Order matters -- STAR assumes the UMI and barcode are in read 2 index_dir: Path to directory containing STAR index out_dir: Path to directory to place STAR output technology: Technology specification whitelist_path: Path to textfile containing barcode whitelist strand: Strandedness of the sequencing protocol may be one of the following: `forward`, `reverse`, `unstranded` n_threads: Number of threads to use temp_dir: STAR temporary directory, defaults to `None`, which uses the system temporary directory nasc: Whether or not to use STAR configuration used in NASC-seq pipeline overrides: STAR command-line argument overrides """ logger.info('Aligning the following FASTQs with STAR') for fastq in fastqs: logger.info((' ' * 8) + fastq) # out_dir must end with a directory separator if not out_dir.endswith(('/', '\\')): out_dir += os.path.sep command = [utils.get_STAR_binary_path()] arguments = config.STAR_ARGUMENTS arguments = utils.combine_arguments(arguments, technology.chemistry.to_starsolo_arguments()) if technology.additional_args: arguments = utils.combine_arguments(arguments, technology.additional_args) if technology.name != 'smartseq': arguments = utils.combine_arguments(arguments, config.STAR_SOLO_ARGUMENTS) if whitelist_path: arguments = utils.combine_arguments(arguments, {'--soloCBwhitelist': whitelist_path}) # Input FASTQs must be plaintext plaintext_fastqs = [] for fastq in fastqs: if ngs.utils.is_gzip(fastq): plaintext_path = utils.mkstemp(dir=temp_dir) logger.warning(f'Decompressing {fastq} to {plaintext_path} because STAR requires plaintext FASTQs') utils.decompress_gzip(fastq, plaintext_path) else: plaintext_path = fastq plaintext_fastqs.append(plaintext_path) arguments['--readFilesIn'] = plaintext_fastqs else: # STAR requires FIFO file support when running in smartseq mode fifo_path = utils.mkstemp(dir=temp_dir, delete=True) try: os.mkfifo(fifo_path) except OSError: raise utils.UnsupportedOSException( f'The filesystem at {temp_dir} does not support FIFO files. ' 'STAR uses FIFO files to run alignment of Smart-seq files.' ) if nasc: arguments = utils.combine_arguments(arguments, config.NASC_ARGUMENTS) manifest_path = utils.mkstemp(dir=temp_dir) with open(fastqs[0], 'r') as f, open(manifest_path, 'w') as out: for line in f: if line.isspace(): continue cell_id, fastq_1, fastq_2 = line.strip().split(',') if fastq_1.endswith('.gz'): plaintext_path = utils.mkstemp(dir=temp_dir) logger.warning( f'Decompressing {fastq_1} to {plaintext_path} because STAR requires plaintext FASTQs' ) utils.decompress_gzip(fastq_1, plaintext_path) fastq_1 = plaintext_path if fastq_2.endswith('.gz'): plaintext_path = utils.mkstemp(dir=temp_dir) logger.warning( f'Decompressing {fastq_2} to {plaintext_path} because STAR requires plaintext FASTQs' ) utils.decompress_gzip(fastq_2, plaintext_path) fastq_2 = plaintext_path out.write(f'{fastq_1}\t{fastq_2}\t{cell_id}\n') arguments['--readFilesManifest'] = manifest_path # Attempt to increase NOFILE limit to maximum possible, and use this to # set n_bins to the maximum possible, as higher n_bin requires less # memory to sort the BAM. current = utils.get_file_descriptor_limit() maximum = min(utils.get_max_file_descriptor_limit(), 100000) arguments = utils.combine_arguments( arguments, { '--soloStrand': strand.capitalize(), '--genomeDir': index_dir, '--runThreadN': n_threads, '--outFileNamePrefix': out_dir, '--outTmpDir': os.path.join(temp_dir, f'{tempfile.gettempprefix()}{next(tempfile._get_candidate_names())}'), '--outBAMsortingBinsN': max((maximum // n_threads) - 10, 50) } ) arguments.update(overrides or {}) logger.debug(f'Increasing maximum number of open file descriptors from {current} to {maximum}') with utils.increase_file_descriptor_limit(maximum): logger.info('Starting alignment') command += utils.arguments_to_list(arguments) utils.run_executable(command) solo_dir = os.path.join(out_dir, constants.STAR_SOLO_DIR) gene_dir = os.path.join(solo_dir, constants.STAR_GENE_DIR) raw_gene_dir = os.path.join(gene_dir, constants.STAR_RAW_DIR) filtered_gene_dir = os.path.join(gene_dir, constants.STAR_FILTERED_DIR) result = { 'bam': os.path.join(out_dir, constants.STAR_BAM_FILENAME), 'gene': { 'raw': { 'barcodes': os.path.join(raw_gene_dir, constants.STAR_BARCODES_FILENAME), 'features': os.path.join(raw_gene_dir, constants.STAR_FEATURES_FILENAME), 'matrix': os.path.join(raw_gene_dir, constants.STAR_MATRIX_FILENAME), }, 'filtered': { 'barcodes': os.path.join(filtered_gene_dir, constants.STAR_BARCODES_FILENAME), 'features': os.path.join(filtered_gene_dir, constants.STAR_FEATURES_FILENAME), 'matrix': os.path.join(filtered_gene_dir, constants.STAR_MATRIX_FILENAME), }, }, } return result
@logger.namespaced('align')
[docs]def align( fastqs: List[str], index_dir: str, out_dir: str, technology: Technology, whitelist_path: Optional[str] = None, strand: Literal['forward', 'reverse', 'unstranded'] = 'forward', n_threads: int = 8, temp_dir: Optional[str] = None, nasc: bool = False, overrides: Optional[Dict[str, str]] = None, ): """Main interface for the `align` command. Args: fastqs: List of path to FASTQs. Order matters -- STAR assumes the UMI and barcode are in read 2 index_dir: Path to directory containing STAR index out_dir: Path to directory to place STAR output technology: Technology specification whitelist_path: Path to textfile containing barcode whitelist strand: Strandedness of the sequencing protocol may be one of the following: `forward`, `reverse`, `unstranded` n_threads: Number of threads to use temp_dir: STAR temporary directory, defaults to `None`, which uses the system temporary directory nasc: Whether or not to use STAR configuration used in NASC-seq pipeline overrides: STAR command-line argument overrides """ os.makedirs(out_dir, exist_ok=True) # Check memory. available_memory = utils.get_available_memory() if available_memory < config.RECOMMENDED_MEMORY: logger.warning( f'There is only {available_memory / (1024 ** 3):.2f} GB of free memory on the machine. ' f'It is highly recommended to have at least {config.RECOMMENDED_MEMORY // (1024 ** 3)} GB ' 'free when running dynast. Continuing may cause dynast to crash with an out-of-memory error.' ) # If whitelist was not provided but one is available, decompress into output # directory. if whitelist_path is None and technology.chemistry.has_whitelist: whitelist_path = os.path.join(out_dir, f'{technology.name}_whitelist.txt') logger.info(f'Copying prepackaged whitelist for technology {technology.name} to {whitelist_path}') utils.decompress_gzip(technology.chemistry.whitelist_path, whitelist_path) STAR_solo_dir = os.path.join(out_dir, constants.STAR_SOLO_DIR) STAR_gene_dir = os.path.join(STAR_solo_dir, constants.STAR_GENE_DIR) STAR_raw_gene_dir = os.path.join(STAR_gene_dir, constants.STAR_RAW_DIR) STAR_filtered_gene_dir = os.path.join(STAR_gene_dir, constants.STAR_FILTERED_DIR) STAR_velocyto_dir = os.path.join(STAR_solo_dir, constants.STAR_VELOCYTO_DIR, constants.STAR_RAW_DIR) # Check if these files exist. If they do, we can skip alignment. STAR_result = { 'bam': os.path.join(out_dir, constants.STAR_BAM_FILENAME), 'gene': { 'raw': { 'barcodes': os.path.join(STAR_raw_gene_dir, constants.STAR_BARCODES_FILENAME), 'features': os.path.join(STAR_raw_gene_dir, constants.STAR_FEATURES_FILENAME), 'matrix': os.path.join(STAR_raw_gene_dir, constants.STAR_MATRIX_FILENAME), }, 'filtered': { 'barcodes': os.path.join(STAR_filtered_gene_dir, constants.STAR_BARCODES_FILENAME), 'features': os.path.join(STAR_filtered_gene_dir, constants.STAR_FEATURES_FILENAME), 'matrix': os.path.join(STAR_filtered_gene_dir, constants.STAR_MATRIX_FILENAME), }, }, 'velocyto': { 'barcodes': os.path.join(STAR_velocyto_dir, constants.STAR_BARCODES_FILENAME), 'features': os.path.join(STAR_velocyto_dir, constants.STAR_FEATURES_FILENAME), 'matrix': os.path.join(STAR_velocyto_dir, constants.STAR_MATRIX_FILENAME), } } STAR_required = utils.flatten_dict_values(STAR_result) skip = utils.all_exists(*STAR_required) if not skip: logger.info(f'STAR binary found at {utils.get_STAR_binary_path()}') STAR_result = STAR_solo( fastqs, index_dir, out_dir, technology, whitelist_path=whitelist_path, strand=strand, n_threads=n_threads, temp_dir=temp_dir, nasc=nasc, overrides=overrides, ) else: logger.info('Alignment files already exist. Provide `--overwrite` to overwrite.')