Skip to content

New directive metarule to composable sub-DAG units with explicit I/O interfaces #4187

@Hocnonsense

Description

@Hocnonsense

This propose a new top-level directive metarule that allows a group of rules to be declared as a named, reusable DAG fragment with explicit input/output interfaces.
A metarule behaves like a rule from the perspective of the rest of the workflow: it participates in wildcard inference, and integrates transparently into the global DAG — but internally it expands into a subgraph of jobs rather than a single command.

Minimal example:

metarule align_pipeline:
    output:
        index = "aligned/{sample}.sorted.bam.bai",
    expose:
        bam   = jobs.j_bam_sort.output[0],
    subdag:
        j_trim = rules.trim(sample=wildcards.sample)
        j_sort = j_trim >> rules.bowtie2 >> rules.bam_sort
        j_sort.named("j_bam_sort")
        j_sort >> rules.bam_index
Full proposal

Motivation

Large workflows are typically composed of well-defined processing stages (e.g. trimming → alignment → sorting), but Snakemake currently offers no way to express this structure explicitly. The existing mechanisms fall short in different ways:

  • module / use rule: file-level reuse across Snakefiles, no explicit I/O contract, wildcard rewriting is verbose, and potentially conflict.
  • checkpoint: designed for runtime-unknown fan-out, also unable to declare DAG.
  • Flat rule namespace: all rules share a single namespace; intermediate files and their producers are fully visible and entangled with the rest of the workflow.

metarule addresses the case where:

  • A fixed sequence (or DAG) of rules always runs together as a logical unit.
  • The author wants to document a clean interface (named inputs/outputs) and optionally hide internal intermediate files.

Proposed Syntax

metarule align_pipeline:
    input:                                     # optional; if declared, must be exhaustive
        fastq = "raw/{sample}.fastq.gz",
    output:                                    # required when wildcards are present
        bam   = "aligned/{sample}.sorted.bam",
        index = "aligned/{sample}.sorted.bam.bai",
    expose:                                    # optional; selectively expose internal outputs
        fq1   = jobs.trim_r1.output[0],        # default cancels any `temp` annotation
        fq2   = temp(jobs.trim_r2.output[0]),  # temp() explicitly declare it
    wildcard_constraints:                      # optional; same semantics as in rule
        sample = r"[A-Za-z0-9_]+",
    subdag:                                    # required; analogous to run: in rule
        # wildcards, input, output injected implicitly

        # Explicit construction: caller supplies wildcards
        j_trim_r1 = rules.trim(sample=wildcards.sample, read="R1").named("trim_r1")
        j_trim_r2 = rules.trim(sample=wildcards.sample, read="R2").named("trim_r2")

        # Forward chaining: wildcards inferred left-to-right
        j_merge = j_trim_r1 >> rules.merge
        j_merge << j_trim_r2                  # converge second input

        # Reverse inference from declared output boundary
        j_sort = output.bam << rules.sort_index

        # Connect and name for expose:
        (j_merge >> j_sort).named("j_sort")
        j_sort >> rules.bam_index

Directive Reference

Directive Required Semantics
subdag: Required Python function body; wildcards, input, output injected implicitly
output: Required when wildcards present Pattern strings; the metarule's external output interface
input: Optional If declared, must be exhaustive; constrains allowed external inputs
expose: Optional Named aliases for internal (non-terminal) job outputs
wildcard_constraints: Optional Same semantics as in rule

Directives belonging to individual jobs (threads:, resources:, conda:,
container:, etc.) are not applicable at the metarule level.


subdag: Block: Job Construction and Chaining

Constructing a Job

rules.foo(...) creates a Job object with the given wildcards. .named() assigns
an in-subdag identifier used by expose: references.

j = rules.trim(sample=wildcards.sample, read="R1")
# enable to reference when needs to `expose:` it
j.named("trim_r1")
# or equivalently:
j = rules.trim(sample=wildcards.sample, read="R1").named("trim_r1")

>> and << Operators

Both operators can instantiate a new Job for the Rule operand by inferring its
wildcards from a known file/job, then register a dependency edge.
The key distinction is which side of each rule is used for matching:

j_a        >> rules.x  → j_a.output matched against rules.x.input  patterns
j_a        << rules.x  → j_a.input  matched against rules.x.output patterns
output.sth << rules.x  → output.sth matched against rules.x.output patterns

In all cases the return value is the newly created Job for rules.x. A Rule
object cannot be on the left-hand side of either operator.

After wildcard inference, if the target Job already exists (same rule and same
wildcards) from a prior operation, they will be unified.

Chaining examples:

# Forward chain: infer right-to-left, return rightmost Job
j_sort = rules.trim(sample=wildcards.sample) >> rules.bowtie2 >> rules.bam_sort

# Reverse chain from declared output boundary
j_bowtie2 = output.bam << rules.sort_index << rules.bowtie2

# Converge two inputs onto one job
j_merge = j_trim_r1 >> rules.merge   # infers sample=S1
j_merge << j_trim_r2                 # also infers sample=S1, consistent ✓

# Connect two already-resolved Jobs: only registers the edge
j_a >> j_b

Slot matching tries each pattern of the target rule's input (for >>) or output (for <<) against the source file path or job's output/input, and binds the wildcards from the matched pattern. Multiple matches raise a WorkflowError.
For explicit slot selection, use the low-level API:

# explicit low-level API (see Appendix):
rules.sort_index.match(j_a, slot=lambda s: s["bam"], match_on="input")

# if not too complex, operator with named slot:
j_a.output["bam"] >> rules.sort_index

External Interface

Calling Conventions

metarules.align_pipeline.bam returns the pattern string "aligned/{sample}.sorted.bam", identical to rules.sort_index.output.bam.
Snakemake's existing wildcard inference handles it without any special-casing.

# Single terminal output — wildcard inferred from context, used positionally
rule list_index:
    input:
        metarules.align_pipeline.index,
    shell: "ls {input[0]}"
 
# Named terminal outputs
rule call_variants:
    input:
        bam   = metarules.align_pipeline.bam,
        index = metarules.align_pipeline.index,
    output: "variants/{sample}.vcf"
    shell: "bcftools call {input.bam} -o {output}"
 
# Multiple slots as a named group — treated as one metarule instance;
# up-to-date checks are unified across all requested slots
rule fastqc:
    input:
        align = metarules.align_pipeline.outputs("bam", "fq1"),
        ref   = "ref/hg38.fa",
    output: "qc/{sample}.html"
    shell: "fastqc {input.align.fq1} --bam {input.align.bam} --ref {input.ref}"

outputs(...) returns a MetaruleOutputGroup. Requesting a slot not declared in output: or expose: raises a WorkflowError when the metarule carries any such declarations.

Encapsulation Model

output: declared expose: declared External visibility
No No All terminal outputs (auto-inferred) + all internal job outputs
Yes No Only declared output aliases; all internal outputs hidden
Yes Yes Declared output aliases + declared expose slots; all else hidden

Encapsulation is implicit: declaring output: or expose: constitutes an interface contract. No separate flag is needed.

temp Interaction in expose:

expose: simultaneously names an internal output and declares its retention policy:

expose:
    fq1 = jobs.trim_r1.output[0],        # default cancels any `temp` annotation
    fq2 = temp(jobs.trim_r2.output[0]),  # temp() explicitly declare it
  • A slot in expose: without temp() cancels any temp annotation from the originating rule: the file must be kept for external consumers.
  • A slot in expose: with temp() remains temporary; callers must complete before the file is deleted.
  • Slots not in expose: retain whatever temp status the originating rule declared.

Referencing Internal Jobs in expose:

When only one job instantiates a given rule, it can be referenced by rule name directly:

expose:
    fq = jobs.trim.output[0]   # unambiguous: only one trim job

When multiple jobs instantiate the same rule, .named() is required:

expose:
    fq1 = jobs.trim_r1.output[0]   # after .named("trim_r1") in subdag: block

Referencing a rule name when multiple unnamed jobs exist for that rule raises a WorkflowError.

Execution

Integration with the Global DAG

When a file matching a metarule's output: pattern is requested:

  1. The subdag: function is called with the resolved wildcards.
  2. Job objects created during execution are collected.
  3. Each job is added to the global DAG. If an identical job (same rule and wildcards) already exists, the new edges are merged into it.

All injected jobs are full participants in the global DAG: they can be depended upon by other rules and are subject to the same scheduling and resource logic.

subdag: blocks are evaluated lazily at DAG construction time, not at parse time, ensuring that use rule substitutions are fully resolved before any subdag: executes.
For CheckpointException inside subdag:: the sub-graph should be fully determined at DAG construction time, which is fundamentally incompatible with checkpoint's two-phase evaluation model.

Up-to-Date Checking

When temp-marked intermediate files have been deleted after execution, the standard per-job up-to-date check would incorrectly trigger re-execution. metarule introduces a completion record to handle this: only files declared in output: or expose: are checked for presence; missing intermediates are ignored unless explicitly needed.

Re-execution decisions for the minimal example, where bowtie2 declares
temp("temp/{sample}.bam"):

Condition Action
Any output: or expose: file missing Re-run entire metarule
Intermediate (e.g. .bam) missing, not referenced externally Do nothing
Input (.fastq.gz) mtime/checksum changed Re-run entire metarule
subdag: source hash changed (if --trigger code enabled) Re-run entire metarule
Internal job output required by another rule outside the metarule Run that job alone; completion record and output:/expose: files unaffected

--forcerun metarule_name re-runs the metarule as a whole by deleting the completion record and re-running all internal jobs.

Appendix: Low-Level Semantics of >> and <<

This section documents the underlying design considered during drafting and is not part of the core proposal.

The >> and << operators are aliases over a single low-level method Rule.match():

rules.x.match(
    file,                              # str, or Job (uses output[0] as file)
    slot = lambda slots: slots[0],     # which slot of `file` / Job to use
    match_on = "input",                # which side of rules.x to match against
)

The two parameters cover all cases:

Expression file match_on
j_a >> rules.x j_a (uses j_a.output[0]) "input"
j_a << rules.x j_a (uses j_a.input[0]) "output"
output.bam << rules.x "aligned/S1.sorted.bam" "output"

Operator implementations:

# Job.__rshift__
def __rshift__(self, other):
    if isinstance(other, Rule):
        return other.match(self, slot=lambda s: s[0], match_on="input")
    if isinstance(other, Job):
        self._dag.add_edge(self, other); return other
    raise TypeError

# Job.__lshift__
def __lshift__(self, other):
    if isinstance(other, Rule):
        return other.match(self, slot=lambda s: s[0], match_on="output")
    raise TypeError

# str.__lshift__ is not definable; Rule.__rrshift__ handles `str << rules.x`
# Rule.__rrshift__
def __rrshift__(self, file):
    if isinstance(file, str):
        return self.match(file, slot=lambda s: s, match_on="output")
    raise TypeError

slot accepts a callable over the named output/input slots, enabling explicit selection when the default [0] is insufficient:

rules.sort_index.match(j_merge, slot=lambda s: s["bam"], match_on="input")

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions