Skip to content

Commit

Permalink
Multiple inputs / overriding inputs
Browse files Browse the repository at this point in the history
By having all phylogenetic workflows start from two lists of inputs
(`config.inputs`, `config.additional_inputs`) we enable a broad range of
uses with a consistent interface.

1. Using local ingest files is trivial (see added docs) and doesn't need
   a bunch of special-cased logic that is prone to falling out of date
   (as it had indeed done)
2. Adding extra / private data follows the similar pattern, with an
   additional config list being used so that we are explicit that the
   new data is additional and enforce an ordering which is needed for
   predictable `augur merge` behaviour. The canonical data can be
   removed / replaced via step (1) if needed.

I considered adding additional data after the subtype-filtering step,
which would avoid the need to add subtype in the metadata but requires
encoding this in the config overlay. I felt the chosen way was simpler
and more powerful.

Note that this workflow uses an old version of the CI workflow,
<https://github.com/nextstrain/.github/blob/v0/.github/workflows/pathogen-repo-ci.yaml#L233-L240>
which copies `example_data`. We could upgrade to the latest version
and use a config overlay to swap out the canonical inputs with the
example data.

Note that one of the side effects of the current implementation is that
merged inputs use the same filepath irrespective of the workflow. For
instance, both gisaid & h5n1-cattle-outbreak use the intermediate path
`results/metadata_merged.tsv`, which means it's not possible to maintain
runs of both those analysis concurrently if both were to use merged
inputs. Using separate analysis directories, e.g.
<#103> will help avoid this
shortcoming.
  • Loading branch information
jameshadfield committed Dec 16, 2024
1 parent 0780396 commit 7a491d1
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 62 deletions.
85 changes: 71 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,32 +108,89 @@ Note that you may need to remove any existing data in `results/` in order for sn

#### Using locally ingested data (instead of downloading from S3)

Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run).
Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`.
If you have run the fauna ingest pipeline locally, you will have files such as
```
ingest/fauna/results/
├── metadata.tsv
├── sequences_ha.fasta
├── sequences_mp.fasta
├── sequences_na.fasta
├── sequences_np.fasta
├── sequences_ns.fasta
├── sequences_pa.fasta
├── sequences_pb1.fasta
└── sequences_pb2.fasta
```

Running with `--config 'inputs=[{"name": "local-ingest", "metadata": "ingest/fauna/results/metadata.tsv", "sequences": "ingest/fauna/results/sequences_{segment}.fasta"}]'` will switch the default S3 source to the local paths listed above.
Alternatively, you can use a config overlay YAML and supply it via `--configfile`:

```yaml
inputs:
- name: local-ingest
metadata: ingest/fauna/results/metadata.tsv
sequences: ingest/fauna/results/sequences_{segment}.fasta
```
### To modify this build to work with your own data
For the **h5n1-cattle-outbreak** the approach is similar, but you'll want to replace "fauna" with "joined-ncbi", "ncbi" or "andersen-lab", as appropriate.
## To modify this build to work with your own data
Although the simplest way to generate a custom build is via the quickstart build, you are welcome to clone this repo and use it as a starting point for running your own, local builds if you'd like. The [Nextstrain docs](https://docs.nextstrain.org/en/latest/index.html) are a fantastic resource for getting started with the Nextstrain pipeline, and include some [great tutorials](https://docs.nextstrain.org/en/latest/install.html) to get you started. This build is slightly more complicated than other builds, and has a few custom functions in it to accommodate all the features on [nextstrain.org](https://nextstrain.org/avian-flu), and makes use of wildcards for both subtypes and gene segments. If you'd like to adapt this full, non-simplified pipeline here to your own data (which you may want to do if you also want to annotate clades), you would need to make a few changes and additions:
#### 1. Data is stored on a private S3 bucket
### Use additional metadata and/or sequences
The phylogenetics pipeline starts by downloading data from a private S3 bucket.
If you don't have credentials for this bucket you can run the build using the example data provided in this repository.
Before running the build, copy the example sequences and metadata into the `data/` directory like so:
Using a config YAML provided via `--configfile`, or the same data via `--config` you can supply additional inputs.
For instance, this repo has a small set of example metadata + HA sequences.
We could add these to the default inputs via:

```bash
mkdir -p data/
cp -r -v example_data/* data/
```yaml
additional_inputs:
- name: example-data
metadata: example_data/gisaid/metadata.tsv
sequences:
ha: example_data/gisaid/sequences_ha.fasta
```

Which will merge `example_data/gisaid/metadata.tsv` with the default metadata, and add the sequences from `example_data/gisaid/sequences_ha.fasta` to the default HA sequences (all other segments will just use the default sequences).

If you had sequences for each segment you could use a wildcard for the segment like so:

```yaml
additional_inputs:
- name: example-data
metadata: example_data/gisaid/metadata.tsv
sequences: example_data/gisaid/sequences_{segment}.fasta
```

Then run the build with the test target, a H5N1 HA "all time" tree:

> NOTE: These added data will be subject to the same filtering rules as the starting data.
At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here.

> NOTE: Metadata merging is via `augur merge` and we add one-hot columns for each input source with the column name `input_{NAME}`, for instance the above example would have a `input_secret` column with values of `1` for metadata rows which were included in `secret.tsv` and `0` otherwise.
You can use this for additional filtering commands as needed.


### Replace the starting metadata and sequences

Using the same approach as above but replacing `additional_inputs` with `inputs` will replace the default inputs.
See "Using locally ingested data (instead of downloading from S3)" (above) for an example of this.


### Run a single H5N1 HA all-time analysis

There is a `test_target` rule which will produce a single H5N1 HA "all-time" tree.
You can combine this with the example data (see above) to run a single build using a small number of sequences:


``` bash
snakemake test_target
snakemake --cores 2 \
--configfile config/gisaid.yaml \
--config 'inputs=[{"name": "example", "metadata": "example_data/gisaid/metadata.tsv", "sequences": "example_data/gisaid/sequences_{segment}.fasta"}]' \
-pf test_target
```

If you'd like to consistently run your own data, then you can place your fasta file in `data`. Alternatively, you can alter the `Snakefile` to remove references to S3 and add paths to your own files (see rules `download_sequences` and `download_metadata`).
### clade labeling

#### 2. clade labeling
If you'd like to run clade labeling, you will need to install [LABEL](https://wonder.cdc.gov/amd/flu/label/) yourself. This pipeline assumes that LABEL is located in `avian-flu/flu-amd/`, and should work if you install it into the `avian-flu` directory. If you do not need to label clades, then you can delete `rule add_h5_clade`, the function `metadata_by_wildcards`. You will need to make sure that all references to `metadata` in the pipeline are referencing `metadata_subtype_segment`, not `metadata-with-clade_subtype_segment`, which is generated by `rule add_h5_clade` and adds a new column to the metadata file with clade information.
168 changes: 134 additions & 34 deletions Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,6 @@ SEGMENTS = ["pb2", "pb1", "pa", "ha","np", "na", "mp", "ns"]
SAME_STRAINS = bool(config.get('same_strains_per_segment', False))

NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/"
S3_SRC = config.get('s3_src', {})
LOCAL_INGEST = config.get('local_ingest', None)

def sanity_check_config():
assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'"
# NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST
# if we want to use `--config local_ingest=gisaid` overrides.
assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive"
if S3_SRC:
assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \
"Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys"

sanity_check_config()

def collect_builds():
"""
Expand Down Expand Up @@ -87,43 +74,156 @@ def subtypes_by_subtype_wildcard(wildcards):
"`subtypes_by_subtype_wildcard` -- is there a typo in the subtype you are targetting?")
return(db[wildcards.subtype])

rule download_sequences:
class InvalidConfigError(Exception):
pass

# ------------- helper functions to collect, merge & download input files ------------------- #

def _input_info(name, address, is_metadata=False, segment=None):
assert (is_metadata and not segment) or (segment and not is_metadata), "Workflow bug"

# Address may include the '{segment}' wildcard
formatted_address = address.format(segment=segment) \
if (segment and '{segment}' in address) \
else address

# addresses may be a remote filepath or a local file
if address.startswith('s3://'):
# path defines the location of the downloaded file (i.e. produced by a download rule)
path = f"data/{name}/metadata.tsv" \
if is_metadata \
else f"data/{name}/sequences_{segment}.fasta"
elif address.lower().startswith(r'http[s]:\/\/'):
raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs")
else:
path = formatted_address

return {
'name': name,
'path': path,
'merge_arg': f"{name}={path}",
'address': formatted_address
}

def gather_inputs(is_metadata=False, segment=None):
"""
Gather all available inputs for metadata or a given segment. Returns all applicable inputs as a list
of dictionaries. See `_input_info` for the dictionary structure.
"""
assert (is_metadata and not segment) or (segment and not is_metadata), "Workflow bug"
inputs = [*config['inputs'], *config.get('additional_inputs', [])]

# Some of these checks can be removed once we have a schema and enforce it
if len(inputs)==0:
raise InvalidConfigError("Config must define at least one element in 'inputs' or 'additional_inputs' lists")
if not all([isinstance(el, dict) for el in inputs]):
raise InvalidConfigError("All of the elements in the config's 'inputs' or 'additional_inputs' lists must be dictionaries. "
"If you've used a command line '--config' double check your quoting.")
if not all(['name' in el for el in inputs]): # can be removed once schema validation is in place
raise InvalidConfigError("Config elements in 'inputs' or 'additional_inputs' lists must each specify a 'name'")
if len({el['name'] for el in inputs})!=len(inputs):
raise InvalidConfigError("Names of inputs ('inputs' and 'additional_inputs') must be unique")
if is_metadata and len([el for el in inputs if 'metadata' in el])==0:
raise InvalidConfigError("Config must define at least one element in 'inputs' or 'additional_inputs' lists with metadata")
if segment and len([el for el in inputs if 'sequences' in el])==0:
raise InvalidConfigError("Config must define at least one element in 'inputs' or 'additional_inputs' lists with sequences")

if is_metadata:
return [
_input_info(el['name'], el['metadata'], is_metadata=True)
for el in inputs
if 'metadata' in el
]

sequence_addresses = [
(el['name'], el['sequences'] if isinstance(el['sequences'], str) else el['sequences'].get(segment, None))
for el in inputs
if 'sequences' in el
]
sequence_addresses = [pair for pair in sequence_addresses if pair[1] is not None]
if len(sequence_addresses)==0:
raise InvalidConfigError(f"Config must define at least one sequence for segment {segment!r} in inputs / additional_inputs.")
return [_input_info(*pair, segment=segment) for pair in sequence_addresses]

def input_name_to_address(name, is_metadata=False, segment=None):
"""
Helper function to return the address behind a particular input's 'name'
"""
return [el for el in gather_inputs(is_metadata, segment) if el['name']==name][0]['address']

def input_metadata(wildcards):
metadata = gather_inputs(is_metadata=True)
if len(metadata)==1:
return metadata[0]['path']
return "results/metadata_merged.tsv"


def input_sequences(wildcards):
sequences = gather_inputs(segment=wildcards.segment)
if len(sequences)==1:
return sequences[0]['path']
return "results/sequences_merged_{segment}.fasta"

rule download_s3_sequences:
output:
sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta",
sequences = "data/{input_name}/sequences_{segment}.fasta",
params:
address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address=lambda w: input_name_to_address(w.input_name, segment=w.segment),
no_sign_request=lambda w: "--no-sign-request" \
if input_name_to_address(w.input_name, segment=w.segment).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences}
"""

rule download_metadata:
rule download_s3_metadata:
output:
metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv",
metadata = "data/{input_name}/metadata.tsv",
params:
address=S3_SRC.get('metadata', None),
no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else ""
address=lambda w: input_name_to_address(w.input_name, is_metadata=True),
no_sign_request=lambda w: "--no-sign-request" \
if input_name_to_address(w.input_name, is_metadata=True).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \
else "",
shell:
"""
aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata}
"""

rule merge_metadata:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs)
"""
input:
metadata = [el['path'] for el in gather_inputs(is_metadata=True)]
params:
metadata = " ".join([el['merge_arg'] for el in gather_inputs(is_metadata=True)])
output:
metadata = "results/metadata_merged.tsv"
shell:
r"""
augur merge \
--metadata {params.metadata} \
--source-columns 'input_{{NAME}}' \
--output-metadata {output.metadata}
"""

def input_metadata(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/metadata.tsv",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/metadata.tsv",
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious

def input_sequences(wildcards):
if S3_SRC:
return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta",
elif LOCAL_INGEST:
return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta"
raise Exception() # already caught by `sanity_check_config` above, this is just being cautious
rule merge_sequences:
"""
This rule should only be invoked if there are multiple defined metadata inputs
(config.inputs + config.additional_inputs) for this particular segment
"""
input:
sequences = lambda w: [el['path'] for el in gather_inputs(segment=w.segment)]
output:
sequences = "results/sequences_merged_{segment}.fasta"
shell:
r"""
seqkit rmdup {input.sequences} > {output.sequences}
"""

# -------------------------------------------------------------------------------------------- #

rule filter_sequences_by_subtype:
input:
Expand Down
12 changes: 5 additions & 7 deletions config/gisaid.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ segments:
- mp
- ns

#### Parameters which define the input source ####
s3_src:
name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst
local_ingest: false
# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna
# Input source(s) - See README.md for how to use local files instead and/or add additional inputs
inputs:
- name: gisaid
metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst
sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst


#### Parameters which control large overarching aspects of the build
Expand Down
13 changes: 6 additions & 7 deletions config/h5n1-cattle-outbreak.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ segments:
- ns


#### Parameters which define the input source ####
s3_src:
name: ncbi
metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst
sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst
local_ingest: false
# P.S. To use local ingest files, comment out s3_src and change to local_ingest: joined-ncbi (e.g.)
# Input source(s) - See README.md for how to use local files instead and/or add additional inputs
inputs:
- name: ncbi
metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst
sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst



#### Parameters which control large overarching aspects of the build
Expand Down

0 comments on commit 7a491d1

Please sign in to comment.