-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.py
32 lines (22 loc) · 1023 Bytes
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import argparse
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from form_etl.pipeline import form_events as beam_pipeline
def main(known_args, beam_options):
beam_pipeline.run(
**vars(known_args),
beam_options=beam_options,
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument("--bootstrap_servers", default="localhost:9092")
parser.add_argument("--topics", default="form.events")
parser.add_argument("--group_id", default="form_etl")
parser.add_argument("--form_event_output_path", default="./form_event")
parser.add_argument("--form_field_output_path", default="./form_field")
parser.add_argument("--max_num_records", type=int, default=None)
parser.add_argument("--api_uri", default="http://localhost:8000/")
known_args, beam_args = parser.parse_known_args()
beam_options = PipelineOptions(beam_args)
main(known_args, beam_options)