Skip to content

Commit

Permalink
Use custom avro library to support union types
Browse files Browse the repository at this point in the history
  • Loading branch information
cavemandaveman committed Jul 18, 2018
1 parent 4024cc7 commit b06e3e3
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 26 deletions.
6 changes: 3 additions & 3 deletions nifi-standardize-date-nar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.2</version>
<version>18.07.3</version>
</parent>

<artifactId>nifi-standardize-date-nar</artifactId>
<version>18.07.2</version>
<version>18.07.3</version>
<packaging>nar</packaging>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
Expand All @@ -34,7 +34,7 @@
<dependency>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-processors</artifactId>
<version>18.07.2</version>
<version>18.07.3</version>
</dependency>
</dependencies>

Expand Down
30 changes: 25 additions & 5 deletions nifi-standardize-date-processors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.2</version>
<version>18.07.3</version>
</parent>

<artifactId>nifi-standardize-date-processors</artifactId>
Expand All @@ -40,16 +40,36 @@
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<!-- Import custom avro and its dependencies to handle union file types -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
<version>1.8.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/src/main/resources/avro-1.8.0-zolyfarkas.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.8.2</version>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.7.2</version>
</dependency>
<!-- End of custom avro and dependencies -->
<!-- Need newer Jackson lib for JsonFactory -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.codehaus.jackson.node.NullNode;

@Tags({"date", "time", "datetime", "standardize", "standardization"})
@CapabilityDescription("NiFi processor to standardize date fields in a FlowFile.")
Expand Down Expand Up @@ -123,7 +124,7 @@ public void process(InputStream in, OutputStream out) throws IOException {
for(Schema.Field f : schemaFields) {
Schema.Field oldField = new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal());
newSchemaFields.add(oldField);
}
}
in = FormatStream.avroToJson(in, schema);
}

Expand All @@ -146,15 +147,27 @@ public void process(InputStream in, OutputStream out) throws IOException {
String invalidDate = jsonParser.getText();
String invalidDateFormat = invalidDates.get(tokenString);
String standardizedDate;

try {
standardizedDate = ManipulateDate.standardize(invalidDate, invalidDateFormat, timezone);
if (invalidDate != "null") {
standardizedDate = ManipulateDate.standardize(invalidDate, invalidDateFormat, timezone);
jsonGen.writeString(standardizedDate);
}
else
jsonGen.writeNull();
} catch (Exception e) {
throw new ProcessException("Couldn't convert '" + invalidDate + "' with format '" + invalidDateFormat + "' with timezone '" + timezone + "'");
}
jsonGen.writeString(standardizedDate);

if (flowFormat == "AVRO")
newSchemaFields.add(new Schema.Field(newFieldName, Schema.create(Type.STRING), null, "null"));
if (flowFormat == "AVRO") {
if (schema.getField(tokenString).schema().getType() == Schema.Type.UNION) {
ArrayList<Schema> unionSchema = new ArrayList<>();
unionSchema.add(Schema.create(Schema.Type.NULL));
unionSchema.add(Schema.create(Schema.Type.STRING));
newSchemaFields.add(new Schema.Field(newFieldName, Schema.createUnion(unionSchema), null, NullNode.getInstance()));
} else
newSchemaFields.add(new Schema.Field(newFieldName, Schema.create(Type.STRING), null, "null"));
}
}
}
jsonGen.writeRaw("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.ExtendedGenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.io.ExtendedJsonDecoder;
import org.apache.avro.io.ExtendedJsonEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,10 +45,10 @@ public class FormatStream {
public static InputStream avroToJson(InputStream in, Schema schema) throws IOException {
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileStream<Object> streamReader = new DataFileStream<Object>(in, reader);
DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
DatumWriter<Object> writer = new ExtendedGenericDatumWriter<>(schema);

ByteArrayOutputStream baos = new ByteArrayOutputStream();
JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos);
ExtendedJsonEncoder encoder = new ExtendedJsonEncoder(schema, baos);

for (Object datum : streamReader)
writer.write(datum, encoder);
Expand All @@ -69,7 +69,7 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream,
writer.setCodec(CodecFactory.snappyCodec());
writer.create(schema, baos);

Decoder decoder = DecoderFactory.get().jsonDecoder(schema, input);
Decoder decoder = new ExtendedJsonDecoder(schema, input);
Object datum;

while (true) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express/ or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Expand Down Expand Up @@ -43,6 +43,7 @@ public void setSchema() throws IOException {
public void testNoProcessing() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema);
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -59,8 +60,9 @@ public void testNoProcessing() throws IOException {
public void testStandardization() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO");
runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema);
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class StandardizeDateJsonTest {
@Test
public void testNoProcessing() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -52,6 +53,7 @@ public void testStandardization() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand All @@ -71,6 +73,7 @@ public void testShortZoneId() throws IOException {
runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON");
runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}");
runner.setProperty(StandardizeDateProperties.TIMEZONE, "CST");
runner.setValidateExpressionUsage(false);

runner.enqueue(unprocessedFile);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"status":"active","bad_date":"10/28/18","bad_date_standardized":"2018-10-28 05:00:00.000","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","bad_date":"11/11/05","bad_date_standardized":"2005-11-11 06:00:00.000","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
{"status":"active","bad_date":"10/28/18","bad_date_standardized":"2018-10-28 05:00:00.000","bad_date_union":{"string":"10/28/18"},"location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","bad_date":"11/11/05","bad_date_standardized":"2005-11-11 06:00:00.000","bad_date_union":null,"location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
}, {
"name" : "bad_date",
"type" : "string"
}, {
"name" : "bad_date_union",
"type" : ["null","string"]
}, {
"name" : "location",
"type" : {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
{"status":"active","bad_date":"10/28/18","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","bad_date":"11/11/05","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
{"status":"active","bad_date":"10/28/18","bad_date_union":{"string":"10/28/18"},"location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"}
{"status":"active","bad_date":"11/11/05","bad_date_union":null,"location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<groupId>com.nineteen04labs</groupId>
<artifactId>nifi-standardize-date-bundle</artifactId>
<version>18.07.2</version>
<version>18.07.3</version>
<packaging>pom</packaging>

<modules>
Expand Down

0 comments on commit b06e3e3

Please sign in to comment.