-
Notifications
You must be signed in to change notification settings - Fork 11
/
beam.py
160 lines (126 loc) · 4.4 KB
/
beam.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#project-id:dataset_id.table_id
delivered_table_spec = 'project-id:dataset_food_orders.delivered_orders'
#project-id:dataset_id.table_id
other_table_spec = 'project-id:dataset_food_orders.other_status_orders'
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import argparse
from google.cloud import bigquery
#Command-line argument parser
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
required=True,
help='Input file to process.')
path_args, pipeline_args = parser.parse_known_args()
inputs_pattern = path_args.input
#Pipeline options
options = PipelineOptions(pipeline_args)
p = beam.Pipeline(options = options)
#Transformation functions
def remove_last_colon(row):
#Remove the trailing colon from the fifth column of the row
cols = row.split(',')
item = str(cols[4])
if item.endswith(':'):
cols[4] = item[:-1]
return ','.join(cols)
def remove_special_characters(row):
#Remove special characters from each column of the row
import re
cols = row.split(',')
ret = ''
for col in cols:
clean_col = re.sub(r'[?%&]','', col)
ret = ret + clean_col + ','
ret = ret[:-1]
return ret
def print_row(row):
print (row)
#Data processing pipeline
cleaned_data = (
p
| beam.io.ReadFromText(inputs_pattern, skip_header_lines=1)
| beam.Map(remove_last_colon)
| beam.Map(lambda row: row.lower())
| beam.Map(remove_special_characters)
| beam.Map(lambda row: row+',1')
)
delivered_orders = (
cleaned_data
| 'delivered filter' >> beam.Filter(lambda row: row.split(',')[8].lower() == 'delivered')
)
other_orders = (
cleaned_data
| 'Undelivered Filter' >> beam.Filter(lambda row: row.split(',')[8].lower() != 'delivered')
)
(cleaned_data
| 'count total' >> beam.combiners.Count.Globally()
| 'total map' >> beam.Map(lambda x: 'Total Count:' +str(x))
| 'print total' >> beam.Map(print_row)
)
(delivered_orders
| 'count delivered' >> beam.combiners.Count.Globally()
| 'delivered map' >> beam.Map(lambda x: 'Delivered count:'+str(x))
| 'print delivered count' >> beam.Map(print_row)
)
(other_orders
| 'count others' >> beam.combiners.Count.Globally()
| 'other map' >> beam.Map(lambda x: 'Others count:'+str(x))
| 'print undelivered' >> beam.Map(print_row)
)
#BigQuery
client = bigquery.Client()
dataset_id = "food-orders-407014.dataset_food_orders"
try:
client.get_dataset(dataset_id)
except:
dataset = bigquery.Dataset(dataset_id) #
dataset.location = "US"
dataset.description = "dataset for food orders"
dataset_ref = client.create_dataset(dataset_id, exists_ok=True)
def to_json(csv_str):
#Convert a CSV string to a JSON object
fields = csv_str.split(',')
json_str = {"customer_id":fields[0],
"date": fields[1],
"timestamp": fields[2],
"order_id": fields[3],
"items": fields[4],
"amount": fields[5],
"mode": fields[6],
"restaurant": fields[7],
"status": fields[8],
"ratings": fields[9],
"feedback": fields[10],
"new_col": fields[11]
}
return json_str
table_schema = 'customer_id:STRING,date:STRING,timestamp:STRING,order_id:STRING,items:STRING,amount:STRING,mode:STRING,restaurant:STRING,status:STRING,ratings:STRING,feedback:STRING,new_col:STRING'
(delivered_orders
| 'delivered to json' >> beam.Map(to_json)
| 'write delivered' >> beam.io.WriteToBigQuery(
delivered_table_spec,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}
)
)
(other_orders
| 'others to json' >> beam.Map(to_json)
| 'write other_orders' >> beam.io.WriteToBigQuery(
other_table_spec,
schema=table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
additional_bq_parameters={'timePartitioning': {'type': 'DAY'}}
)
)
#Running the pipeline
from apache_beam.runners.runner import PipelineState
ret = p.run()
if ret.state == PipelineState.DONE:
print('Success!!!')
else:
print('Error Running beam pipeline')