Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic workflow for basecalling ONT pod5 files. Does not yet include demultiplexing. #61

Open
wants to merge 10 commits into
base: simon-ont-dev
Choose a base branch
from
40 changes: 40 additions & 0 deletions bin/batch_pod5.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simonleandergrimm can you put this in the appropriate module directory? We're using module binaries now.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#! /usr/bin/env python

import os
import argparse
import shutil

parser = argparse.ArgumentParser()
parser.add_argument("--batch_size", type=int, default=1024**3) # 1GiB
parser.add_argument("--pod5_dir", type=str)
parser.add_argument("--output_dir", type=str)
args = parser.parse_args()

batches = []
current_batch = []
current_batch_size = 0
batch_num = 0

for fname in os.listdir(args.pod5_dir):
path = os.path.join(args.pod5_dir, fname)
size = os.path.getsize(path)

if current_batch_size + size > args.batch_size:
batch_num += 1
batch_dir = os.path.join(args.output_dir, f"batch_{batch_num:03d}")
os.makedirs(batch_dir, exist_ok=True)
for pod5_file in current_batch:
os.symlink(pod5_file, os.path.join(batch_dir, os.path.basename(pod5_file)))
current_batch = []
current_batch_size = 0

current_batch.append(path)
current_batch_size += size

# Handle the remaining files in the last batch
if current_batch:
batch_num += 1
batch_dir = os.path.join(args.output_dir, f"batch_{batch_num:03d}")
os.makedirs(batch_dir, exist_ok=True)
for pod5_file in current_batch:
os.symlink(pod5_file, os.path.join(batch_dir, os.path.basename(pod5_file)))
38 changes: 38 additions & 0 deletions configs/basecall.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/************************************************
| CONFIGURATION FILE FOR NAO BASECALL WORKFLOW |
************************************************/

params {
mode = "basecall"
debug = true

// Directories
base_dir = "s3://nao-mgs-simon/NAO-ONT-20240905-DCS_RNA2_Test"

// Run parameters
// TODO: Automate getting run name from base_dir
nanopore_run = "NAO-ONT-20240905-DCS_RNA2_Test"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this value is always equivalent to the basename of base_dir, you can do this with Groovy: https://www.nextflow.io/docs/latest/working-with-files.html#getting-file-attributes


// TODO: Add optional paramter for ONT kit

// Files
pod5_dir = "${base_dir}/pod5_small/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these need to be in the config file? If they'll always be the same except for the base_dir you can hardcode them in the pipeline and save the user some thinking.

bam_dir = "${base_dir}/bam/"
calls_bam = "${base_dir}/bam/calls.bam"
fastq_file = "${base_dir}/raw/${nanopore_run}.fastq.gz"

// Default values for undefined run.nf parameters. Adding them here because they always get triggered
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be fixed now that I've removed all the addParams statements. Try it again after merging in the changes from master?

r1_suffix = ""
r2_suffix = ""
quality_encoding = "auto"
kraken_memory = "8G"
bbduk_suffix = ""
adapters = ""
}


includeConfig "${projectDir}/configs/containers.config"
includeConfig "${projectDir}/configs/resources.config"
includeConfig "${projectDir}/configs/profiles.config"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some missing standard configs to add once you've merged in master.


process.queue = "slg-basecall-batch-queue"
8 changes: 8 additions & 0 deletions configs/containers.config
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,12 @@ process {
withLabel: fastp {
container = "staphb/fastp:0.23.4"
}
withLabel: dorado {
container = "ontresearch/dorado:latest"
// NB: For now going with latest version, maybe the version switching with new updates will break things in the future.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend locking in a version soon

}
withLabel: samtools {
container = "staphb/samtools:latest"
}

}
4 changes: 4 additions & 0 deletions main.nf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
include { RUN } from "./workflows/run"
include { RUN2 } from "./workflows/run2"
include { INDEX } from "./workflows/index"
include { BASECALL } from "./workflows/basecall"

// Configure working and output directories
pubDir = "${params.base_dir}/output"
Expand All @@ -13,6 +14,9 @@ workflow {
} else if (params.mode == "run2") {
RUN2()
}
else if (params.mode == "basecall") {
BASECALL()
}
}

output {
Expand Down
46 changes: 46 additions & 0 deletions modules/local/batchPod5/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
process BATCH_POD_5 {
label "pandas"
label "batch_pod_5"

input:
path(pod_5_dir)
val batch_size
output:
path('pod_5_batch/*')
// ## batch_pod5.py --batch_size !{batch_size} --pod5_dir !{pod_5_dir} --output_dir pod_5_batch/
shell:
'''
batch_num=0
current_batch_size=0
current_batch=()

for fname in $(ls !{pod_5_dir}); do
path="!{pod_5_dir}/$fname"
size=$(stat -c %s "$path")

if [ $((current_batch_size + size)) -gt !{batch_size} ]; then
batch_num=$((batch_num + 1))
batch_dir="pod_5_batch/batch_$(printf "%03d" $batch_num)"
mkdir -p "$batch_dir"
for pod5_file in "${current_batch[@]}"; do
ln -s "$pod5_file" "$batch_dir/$(basename $pod5_file)"
done
current_batch=()
current_batch_size=0
fi

current_batch+=("$path")
current_batch_size=$((current_batch_size + size))
done

# Handle remaining files in last batch
if [ ${#current_batch[@]} -gt 0 ]; then
batch_num=$((batch_num + 1))
batch_dir="pod_5_batch/batch_$(printf "%03d" $batch_num)"
mkdir -p "$batch_dir"
for pod5_file in "${current_batch[@]}"; do
ln -s "$pod5_file" "$batch_dir/$(basename $pod5_file)"
done
fi
'''
}
45 changes: 45 additions & 0 deletions modules/local/dorado/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Basecall Nanopore pod5 files with Dorado
process BASECALL_POD_5 {
label "dorado"
label "basecall"
accelerator 1
memory '8 GB'

input:
path(pod_5_dir)
val kit

output:
path("calls.bam"), emit: bam
path("sequencing_summary.txt"), emit: summary

shell:
'''
# Extract batch number using bash
# batch_num=$(basename !{pod_5_dir} | grep -o '[0-9]\\+') # Disabled in the absence of batching

# Dorado basecalling
dorado basecaller sup !{pod_5_dir} --kit-name !{kit} > calls.bam

dorado summary calls.bam > sequencing_summary.txt
'''
}

// Demultiplex basecalled BAM files
process DEMUX_POD_5 {
label "dorado"
label "demux"
accelerator 1
memory '8 GB'

input:
path(calls_bam)
val kit
output:
path('demultiplexed/*'), emit: demux_bam

shell:
'''
dorado demux --output-dir demultiplexed/ --kit-name !{kit} !{calls_bam}
'''
}
18 changes: 18 additions & 0 deletions modules/local/samtools/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
process BAM_TO_FASTQ {
label "samtools"

input:
path(bam_dir)
val nanopore_run
output:
path '*.fastq.gz'

shell:
'''
# Run samtools
for bam in !{bam_dir}/*.bam; do
basename=$(basename "$bam" .bam)
barcode=$(echo $basename | sed 's/b7f847d7a590c4991a770d9fe21324ef21b88a6c_//')
samtools fastq "$bam" | gzip -c > "!{nanopore_run}_${barcode}.fastq.gz" done
'''
}
46 changes: 46 additions & 0 deletions workflows/basecall.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/***********************************************************************************************
| WORKFLOW: BASECALLING NANOPORE SQUIGGLE DATA |
***********************************************************************************************/

import groovy.json.JsonOutput
import java.time.LocalDateTime

/***************************
| MODULES AND SUBWORKFLOWS |
***************************/

include { BATCH_POD_5 } from "../modules/local/batchPod5"
include { BASECALL_POD_5 } from "../modules/local/dorado"
include { DEMUX_POD_5 } from "../modules/local/dorado"
include { BAM_TO_FASTQ } from "../modules/local/samtools"

/*****************
| MAIN WORKFLOWS |
*****************/

// Complete primary workflow
workflow BASECALL {
main:
// Start time
start_time = new Date()
start_time_str = start_time.format("YYYY-MM-dd HH:mm:ss z (Z)")

// Batching
// batch_pod5_ch = BATCH_POD_5(params.pod_5_dir, params.batch_size)
// .flatten()

// Basecalling
bam_ch = BASECALL_POD_5(params.pod_5_dir, params.kit)

// Demultiplexing
demux_ch = DEMUX_POD_5(bam_ch.bam, params.kit)

// Convert to FASTQ
fastq_ch = BAM_TO_FASTQ(demux_ch.demux_bam, params.nanopore_run)



publish:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These need to be compatible with the output structure specified in main.nf (@simonleandergrimm look into this more after merging in master)

fastq_ch >> "raw"
bam_ch.summary >> "summary"
}