From fde2126aa02784a0aefee595b6a2d52378d3c820 Mon Sep 17 00:00:00 2001 From: james hadfield Date: Mon, 16 Dec 2024 13:27:38 +1300 Subject: [PATCH] Multiple inputs / overriding inputs By having all phylogenetic workflows start from two lists of inputs (`config.inputs`, `config.additional_inputs`) we enable a broad range of uses with a consistent interface. 1. Using local ingest files is trivial (see added docs) and doesn't need a bunch of special-cased logic that is prone to falling out of date (as it had indeed done) 2. Adding extra / private data follows the similar pattern, with an additional config list being used so that we are explicit that the new data is additional and enforce an ordering which is needed for predictable `augur merge` behaviour. The canonical data can be removed / replaced via step (1) if needed. I considered adding additional data after the subtype-filtering step, which would avoid the need to add subtype in the metadata but requires encoding this in the config overlay. I felt the chosen way was simpler and more powerful. When considering sequences the structure is more complex than metadata because the influenza genome is segmented and we wish to allow users to provide additional data for only some segments (see docstring for `_parse_config_input`). For non-segmented pathogens the simpler structure used here for metadata could also be used for sequences. This workflow uses an old version of the CI workflow, which copies `example_data`. We could upgrade to the latest version and use a config overlay to swap out the canonical inputs with the example data. Note that one of the side effects of the current implementation is that merged inputs use the same filepath irrespective of the workflow. For instance, both gisaid & h5n1-cattle-outbreak use the intermediate path `results/metadata_merged.tsv`, which means it's not possible to maintain runs of both those analysis concurrently if both were to use merged inputs. Using separate analysis directories, e.g. will help avoid this shortcoming. --- README.md | 86 +++++++++++++--- Snakefile | 163 ++++++++++++++++++++++++------- config/gisaid.yaml | 12 +-- config/h5n1-cattle-outbreak.yaml | 13 ++- 4 files changed, 212 insertions(+), 62 deletions(-) diff --git a/README.md b/README.md index 4ec2f54..86049f0 100755 --- a/README.md +++ b/README.md @@ -108,32 +108,90 @@ Note that you may need to remove any existing data in `results/` in order for sn #### Using locally ingested data (instead of downloading from S3) -Run the pipeline with `--config 'local_ingest=True'` to use the locally available files produced by the ingest pipeline (see `./ingest/README.md` for details on how to run). -Specifically, the files needed are `ingest/results/metadata.tsv` and `ingest/results/sequences_{SEGMENT}.fasta`. +If you have run the fauna ingest pipeline locally, you will have files such as +``` +ingest/fauna/results/ +├── metadata.tsv +├── sequences_ha.fasta +├── sequences_mp.fasta +├── sequences_na.fasta +├── sequences_np.fasta +├── sequences_ns.fasta +├── sequences_pa.fasta +├── sequences_pb1.fasta +└── sequences_pb2.fasta +``` + +Running with `--config 'inputs=[{"name": "local-ingest", "metadata": "ingest/fauna/results/metadata.tsv", "sequences": "ingest/fauna/results/sequences_{segment}.fasta"}]'` will switch the default S3 source to the local paths listed above. +Alternatively, you can use a config overlay YAML and supply it via `--configfile`: + +```yaml +inputs: + - name: local-ingest + metadata: ingest/fauna/results/metadata.tsv + sequences: ingest/fauna/results/sequences_{segment}.fasta +``` -### To modify this build to work with your own data +For the **h5n1-cattle-outbreak** the approach is similar, but you'll want to replace "fauna" with "joined-ncbi", "ncbi" or "andersen-lab", as appropriate. + +## To modify this build to work with your own data Although the simplest way to generate a custom build is via the quickstart build, you are welcome to clone this repo and use it as a starting point for running your own, local builds if you'd like. The [Nextstrain docs](https://docs.nextstrain.org/en/latest/index.html) are a fantastic resource for getting started with the Nextstrain pipeline, and include some [great tutorials](https://docs.nextstrain.org/en/latest/install.html) to get you started. This build is slightly more complicated than other builds, and has a few custom functions in it to accommodate all the features on [nextstrain.org](https://nextstrain.org/avian-flu), and makes use of wildcards for both subtypes and gene segments. If you'd like to adapt this full, non-simplified pipeline here to your own data (which you may want to do if you also want to annotate clades), you would need to make a few changes and additions: -#### 1. Data is stored on a private S3 bucket +### Use additional metadata and/or sequences -The phylogenetics pipeline starts by downloading data from a private S3 bucket. -If you don't have credentials for this bucket you can run the build using the example data provided in this repository. -Before running the build, copy the example sequences and metadata into the `data/` directory like so: +Using a config YAML provided via `--configfile`, or the same data via `--config` you can supply additional inputs. +For instance, this repo has a small set of example metadata + HA sequences. +We could add these to the default inputs via: -```bash -mkdir -p data/ -cp -r -v example_data/* data/ +```yaml +additional_inputs: + - name: example-data + metadata: example_data/gisaid/metadata.tsv + sequences: + ha: example_data/gisaid/sequences_ha.fasta +``` + +Which will merge `example_data/gisaid/metadata.tsv` with the default metadata, and add the sequences from `example_data/gisaid/sequences_ha.fasta` to the default HA sequences (all other segments will just use the default sequences). + +If you had sequences for each segment you could use a wildcard for the segment like so: + +```yaml +additional_inputs: + - name: example-data + metadata: example_data/gisaid/metadata.tsv + sequences: example_data/gisaid/sequences_{segment}.fasta ``` -Then run the build with the test target, a H5N1 HA "all time" tree: + +> NOTE: These added data will be subject to the same filtering rules as the starting data. + At a minimum, you'll want to ensure new sequences have metadata which defines their subtype and date, as filter steps will prune out those without valid values here. + +> NOTE: Metadata merging is via `augur merge` and we add a column per input source to indicate the origin of data. + For instance the above example would have a `input_example-data` column with values of `1` for metadata rows which were included in `example_data/gisaid/metadata.tsv` and `0` otherwise. + You can use this for additional filtering commands as needed. + + +### Replace the starting metadata and sequences + +Using the same approach as above but replacing `additional_inputs` with `inputs` will replace the default inputs. +See "Using locally ingested data (instead of downloading from S3)" (above) for an example of this. + + +### Run a single H5N1 HA all-time analysis + +There is a `test_target` rule which will produce a single H5N1 HA "all-time" tree. +You can combine this with the example data (see above) to run a single build using a small number of sequences: + ``` bash -snakemake test_target +snakemake --cores 2 \ + --configfile config/gisaid.yaml \ + --config 'inputs=[{"name": "example", "metadata": "example_data/gisaid/metadata.tsv", "sequences": "example_data/gisaid/sequences_{segment}.fasta"}]' \ + -pf test_target ``` -If you'd like to consistently run your own data, then you can place your fasta file in `data`. Alternatively, you can alter the `Snakefile` to remove references to S3 and add paths to your own files (see rules `download_sequences` and `download_metadata`). +### clade labeling -#### 2. clade labeling If you'd like to run clade labeling, you will need to install [LABEL](https://wonder.cdc.gov/amd/flu/label/) yourself. This pipeline assumes that LABEL is located in `avian-flu/flu-amd/`, and should work if you install it into the `avian-flu` directory. If you do not need to label clades, then you can delete `rule add_h5_clade`, the function `metadata_by_wildcards`. You will need to make sure that all references to `metadata` in the pipeline are referencing `metadata_subtype_segment`, not `metadata-with-clade_subtype_segment`, which is generated by `rule add_h5_clade` and adds a new column to the metadata file with clade information. diff --git a/Snakefile b/Snakefile index 0a9fa55..95ef489 100755 --- a/Snakefile +++ b/Snakefile @@ -15,19 +15,6 @@ SEGMENTS = ["pb2", "pb1", "pa", "ha","np", "na", "mp", "ns"] SAME_STRAINS = bool(config.get('same_strains_per_segment', False)) NEXTSTRAIN_PUBLIC_BUCKET = "s3://nextstrain-data/" -S3_SRC = config.get('s3_src', {}) -LOCAL_INGEST = config.get('local_ingest', None) - -def sanity_check_config(): - assert LOCAL_INGEST or S3_SRC, "The config must define either 's3_src' or 'local_ingest'" - # NOTE: we could relax the following exclusivity of S3_SRC and LOCAL_INGEST - # if we want to use `--config local_ingest=gisaid` overrides. - assert not (S3_SRC and LOCAL_INGEST), "The config defined both 'local_ingest' and 's3_src', which are mutually exclusive" - if S3_SRC: - assert isinstance(S3_SRC, dict) and all([k in S3_SRC for k in ("name", "sequences", "metadata")]), \ - "Config 's3_src' must be a dict with 'name', 'sequences' and 'metadata' keys" - -sanity_check_config() def collect_builds(): """ @@ -87,43 +74,151 @@ def subtypes_by_subtype_wildcard(wildcards): "`subtypes_by_subtype_wildcard` -- is there a typo in the subtype you are targetting?") return(db[wildcards.subtype]) -rule download_sequences: +class InvalidConfigError(Exception): + pass + +# ------------- helper functions to collect, merge & download input files ------------------- # + +def _parse_config_input(input): + """ + Parses information from an individual config-defined input, i.e. an element within `config.inputs` or `config.additional_inputs` + and returns information snakemake rules can use to obtain the underlying data. + + The structure of `input` is a dictionary with keys: + - name:string (required) + - metadata:string (optional) - a s3 URI or a local file path + - sequences:string|dict[string,string] (optional) - either a s3 URI or a local file path, in which case + it must include a '{segment}' wildcard substring, or a dict of segment → s3 URI or local file path, + in which case it must not include the wildcard substring. + + Returns a dictionary with optional keys: + - metadata:string - the relative path to the metadata file. If the original data was remote then this represents + the output of a rule which downloads the file + - metadata_location:string - the URI for the remote file if applicable else `None` + - sequences:function. Takes in wildcards and returns the relative path to the sequences FASTA for the provided + segment wildcard, or returns `None` if this input doesn't define sequences for the provided segment. + - sequences_location:function. Takes in wildcards and returns the URI for the remote file, or `None`, where applicable. + + Raises InvalidConfigError + """ + name = input['name'] + lambda_none = lambda w: None + + info = {'metadata': None, 'metadata_location': None, 'sequences': lambda_none, 'sequences_location': lambda_none} + + def _source(uri, *, s3, local): + if uri.startswith('s3://'): + return s3 + elif uri.lower().startswith('http://') or uri.lower().startswith('https://'): + raise InvalidConfigError("Workflow cannot yet handle HTTP[S] inputs") + return local + + if location:=input.get('metadata', False): + info['metadata'] = _source(location, s3=f"data/{name}/metadata.tsv", local=location) + info['metadata_location'] = _source(location, s3=location, local=None) + + if location:=input.get('sequences', False): + if isinstance(location, dict): + info['sequences'] = lambda w: _source(location[w.segment], s3=f"data/{name}/sequences_{w.segment}.fasta", local=location[w.segment]) \ + if w.segment in location \ + else None + info['sequences_location'] = lambda w: _source(location[w.segment], s3=location[w.segment], local=None) \ + if w.segment in location \ + else None + elif isinstance(location, str): + info['sequences'] = _source(location, s3=lambda w: f"data/{name}/sequences_{w.segment}.fasta", local=lambda w: location.format(segment=w.segment)) + info['sequences_location'] = _source(location, s3=lambda w: location.format(segment=w.segment), local=lambda_none) + else: + raise InvalidConfigError(f"Config input for {name} specifies sequences in an unknown format; must be dict or string") + + return info + + +def _gather_inputs(): + all_inputs = [*config['inputs'], *config.get('additional_inputs', [])] + + if len(all_inputs)==0: + raise InvalidConfigError("Config must define at least one element in config.inputs or config.additional_inputs lists") + if not all([isinstance(i, dict) for i in all_inputs]): + raise InvalidConfigError("All of the elements in config.inputs and config.additional_inputs lists must be dictionaries" + "If you've used a command line '--config' double check your quoting.") + if len({i['name'] for i in all_inputs})!=len(all_inputs): + raise InvalidConfigError("Names of inputs (config.inputs and config.additional_inputs) must be unique") + if not all(['name' in i and ('sequences' in i or 'metadata' in i) for i in all_inputs]): + raise InvalidConfigError("Each input (config.inputs and config.additional_inputs) must have a 'name' and 'metadata' and/or 'sequences'") + + return {i['name']: _parse_config_input(i) for i in all_inputs} + +input_sources = _gather_inputs() + +def input_metadata(wildcards): + inputs = [info['metadata'] for info in input_sources.values() if info.get('metadata', None)] + return inputs[0] if len(inputs)==1 else "results/metadata_merged.tsv" + +def input_sequences(wildcards): + inputs = list(filter(None, [info['sequences'](wildcards) for info in input_sources.values() if info.get('sequences', None)])) + return inputs[0] if len(inputs)==1 else "results/sequences_merged_{segment}.fasta" + +rule download_s3_sequences: output: - sequences = f"data/{S3_SRC.get('name', None)}/sequences_{{segment}}.fasta", + sequences = "data/{input_name}/sequences_{segment}.fasta", params: - address=lambda w: S3_SRC.get('sequences', None).format(segment=w.segment), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('sequences', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address = lambda w: input_sources[w.input_name]['sequences_location'](w), + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['sequences_location'](w).startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.sequences} """ -rule download_metadata: +rule download_s3_metadata: output: - metadata = f"data/{S3_SRC.get('name', None)}/metadata.tsv", + metadata = "data/{input_name}/metadata.tsv", params: - address=S3_SRC.get('metadata', None), - no_sign_request=lambda w: "--no-sign-request" if S3_SRC.get('metadata', "").startswith(NEXTSTRAIN_PUBLIC_BUCKET) else "" + address = lambda w: input_sources[w.input_name]['metadata_location'], + no_sign_request=lambda w: "--no-sign-request" \ + if input_sources[w.input_name]['metadata_location'].startswith(NEXTSTRAIN_PUBLIC_BUCKET) \ + else "", shell: """ aws s3 cp {params.no_sign_request:q} {params.address:q} - | zstd -d > {output.metadata} """ +rule merge_metadata: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) + """ + input: + **{name: info['metadata'] for name,info in input_sources.items() if info.get('metadata', None)} + params: + metadata = lambda w, input: list(map("=".join, input.items())) + output: + metadata = "results/metadata_merged.tsv" + shell: + r""" + augur merge \ + --metadata {params.metadata:q} \ + --source-columns 'input_{{NAME}}' \ + --output-metadata {output.metadata} + """ -def input_metadata(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/metadata.tsv", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/metadata.tsv", - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious - -def input_sequences(wildcards): - if S3_SRC: - return f"data/{S3_SRC['name']}/sequences_{wildcards.segment}.fasta", - elif LOCAL_INGEST: - return f"ingest/{LOCAL_INGEST}/results/sequences_{wildcards.segment}.fasta" - raise Exception() # already caught by `sanity_check_config` above, this is just being cautious +rule merge_sequences: + """ + This rule should only be invoked if there are multiple defined metadata inputs + (config.inputs + config.additional_inputs) for this particular segment + """ + input: + lambda w: list(filter(None, [info['sequences'](w) for info in input_sources.values()])) + output: + sequences = "results/sequences_merged_{segment}.fasta" + shell: + r""" + seqkit rmdup {input:q} > {output.sequences:q} + """ +# -------------------------------------------------------------------------------------------- # rule filter_sequences_by_subtype: input: diff --git a/config/gisaid.yaml b/config/gisaid.yaml index 284d36e..9765859 100644 --- a/config/gisaid.yaml +++ b/config/gisaid.yaml @@ -21,13 +21,11 @@ segments: - mp - ns -#### Parameters which define the input source #### -s3_src: - name: gisaid - metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst - sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: fauna +# Input source(s) - See README.md for how to use local files instead and/or add additional inputs +inputs: + - name: gisaid + metadata: s3://nextstrain-data-private/files/workflows/avian-flu/metadata.tsv.zst + sequences: s3://nextstrain-data-private/files/workflows/avian-flu/{segment}/sequences.fasta.zst #### Parameters which control large overarching aspects of the build diff --git a/config/h5n1-cattle-outbreak.yaml b/config/h5n1-cattle-outbreak.yaml index bd0d396..13f6505 100644 --- a/config/h5n1-cattle-outbreak.yaml +++ b/config/h5n1-cattle-outbreak.yaml @@ -20,13 +20,12 @@ segments: - ns -#### Parameters which define the input source #### -s3_src: - name: ncbi - metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst - sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst -local_ingest: false -# P.S. To use local ingest files, comment out s3_src and change to local_ingest: joined-ncbi (e.g.) +# Input source(s) - See README.md for how to use local files instead and/or add additional inputs +inputs: + - name: ncbi + metadata: s3://nextstrain-data/files/workflows/avian-flu/h5n1/metadata.tsv.zst + sequences: s3://nextstrain-data/files/workflows/avian-flu/h5n1/{segment}/sequences.fasta.zst + #### Parameters which control large overarching aspects of the build