diff --git a/nifi-standardize-date-nar/pom.xml b/nifi-standardize-date-nar/pom.xml index ec759be..27ae4f5 100644 --- a/nifi-standardize-date-nar/pom.xml +++ b/nifi-standardize-date-nar/pom.xml @@ -19,11 +19,11 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.2 + 18.07.3 nifi-standardize-date-nar - 18.07.2 + 18.07.3 nar true @@ -34,7 +34,7 @@ com.nineteen04labs nifi-standardize-date-processors - 18.07.2 + 18.07.3 diff --git a/nifi-standardize-date-processors/pom.xml b/nifi-standardize-date-processors/pom.xml index 6d15642..1a402ec 100644 --- a/nifi-standardize-date-processors/pom.xml +++ b/nifi-standardize-date-processors/pom.xml @@ -20,7 +20,7 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.2 + 18.07.3 nifi-standardize-date-processors @@ -40,16 +40,36 @@ commons-io 2.6 + org.apache.avro avro - 1.8.2 + 1.8.0 + system + ${project.basedir}/src/main/resources/avro-1.8.0-zolyfarkas.jar - org.apache.avro - avro-tools - 1.8.2 + org.codehaus.jackson + jackson-core-asl + 1.9.13 + + + org.codehaus.jackson + jackson-mapper-asl + 1.9.13 + + + com.google.guava + guava + 19.0 + + + org.xerial.snappy + snappy-java + 1.1.7.2 + + com.fasterxml.jackson.core jackson-databind diff --git a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java index 4f908a8..685427e 100644 --- a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java +++ b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/standardizedate/StandardizeDate.java @@ -52,6 +52,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; +import org.codehaus.jackson.node.NullNode; @Tags({"date", "time", "datetime", "standardize", "standardization"}) @CapabilityDescription("NiFi processor to standardize date fields in a FlowFile.") @@ -123,7 +124,7 @@ public void process(InputStream in, OutputStream out) throws IOException { for(Schema.Field f : schemaFields) { Schema.Field oldField = new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal()); newSchemaFields.add(oldField); - } + } in = FormatStream.avroToJson(in, schema); } @@ -146,15 +147,27 @@ public void process(InputStream in, OutputStream out) throws IOException { String invalidDate = jsonParser.getText(); String invalidDateFormat = invalidDates.get(tokenString); String standardizedDate; + try { - standardizedDate = ManipulateDate.standardize(invalidDate, invalidDateFormat, timezone); + if (invalidDate != "null") { + standardizedDate = ManipulateDate.standardize(invalidDate, invalidDateFormat, timezone); + jsonGen.writeString(standardizedDate); + } + else + jsonGen.writeNull(); } catch (Exception e) { throw new ProcessException("Couldn't convert '" + invalidDate + "' with format '" + invalidDateFormat + "' with timezone '" + timezone + "'"); } - jsonGen.writeString(standardizedDate); - if (flowFormat == "AVRO") - newSchemaFields.add(new Schema.Field(newFieldName, Schema.create(Type.STRING), null, "null")); + if (flowFormat == "AVRO") { + if (schema.getField(tokenString).schema().getType() == Schema.Type.UNION) { + ArrayList unionSchema = new ArrayList<>(); + unionSchema.add(Schema.create(Schema.Type.NULL)); + unionSchema.add(Schema.create(Schema.Type.STRING)); + newSchemaFields.add(new Schema.Field(newFieldName, Schema.createUnion(unionSchema), null, NullNode.getInstance())); + } else + newSchemaFields.add(new Schema.Field(newFieldName, Schema.create(Type.STRING), null, "null")); + } } } jsonGen.writeRaw("\n"); diff --git a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java index 1f1451a..de424ac 100644 --- a/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java +++ b/nifi-standardize-date-processors/src/main/java/com/nineteen04labs/processors/util/FormatStream.java @@ -27,14 +27,14 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.ExtendedGenericDatumWriter; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.io.EncoderFactory; -import org.apache.avro.io.JsonEncoder; +import org.apache.avro.io.ExtendedJsonDecoder; +import org.apache.avro.io.ExtendedJsonEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,10 +45,10 @@ public class FormatStream { public static InputStream avroToJson(InputStream in, Schema schema) throws IOException { GenericDatumReader reader = new GenericDatumReader(); DataFileStream streamReader = new DataFileStream(in, reader); - DatumWriter writer = new GenericDatumWriter(schema); + DatumWriter writer = new ExtendedGenericDatumWriter<>(schema); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos); + ExtendedJsonEncoder encoder = new ExtendedJsonEncoder(schema, baos); for (Object datum : streamReader) writer.write(datum, encoder); @@ -69,7 +69,7 @@ public static ByteArrayOutputStream jsonToAvro(ByteArrayOutputStream jsonStream, writer.setCodec(CodecFactory.snappyCodec()); writer.create(schema, baos); - Decoder decoder = DecoderFactory.get().jsonDecoder(schema, input); + Decoder decoder = new ExtendedJsonDecoder(schema, input); Object datum; while (true) { diff --git a/nifi-standardize-date-processors/src/main/resources/avro-1.8.0-zolyfarkas.jar b/nifi-standardize-date-processors/src/main/resources/avro-1.8.0-zolyfarkas.jar new file mode 100644 index 0000000..e833b09 Binary files /dev/null and b/nifi-standardize-date-processors/src/main/resources/avro-1.8.0-zolyfarkas.jar differ diff --git a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java index 4f60c07..14029ef 100644 --- a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java +++ b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateAvroTest.java @@ -10,7 +10,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express/ or implied. * See the License for the specific language governing permissions and * limitations under the License. */ @@ -43,6 +43,7 @@ public void setSchema() throws IOException { public void testNoProcessing() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO"); runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema); + runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -59,8 +60,9 @@ public void testNoProcessing() throws IOException { public void testStandardization() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "AVRO"); runner.setProperty(StandardizeDateProperties.AVRO_SCHEMA, avroSchema); - runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}"); + runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\",\"bad_date_union\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago"); + runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); diff --git a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java index 01d8b8b..e04f0a0 100644 --- a/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java +++ b/nifi-standardize-date-processors/src/test/java/com/nineteen04labs/processors/standardizedate/StandardizeDateJsonTest.java @@ -33,6 +33,7 @@ public class StandardizeDateJsonTest { @Test public void testNoProcessing() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); + runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -52,6 +53,7 @@ public void testStandardization() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "America/Chicago"); + runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); @@ -71,6 +73,7 @@ public void testShortZoneId() throws IOException { runner.setProperty(StandardizeDateProperties.FLOW_FORMAT, "JSON"); runner.setProperty(StandardizeDateProperties.INVALID_DATES, "{\"bad_date\":\"MM/dd/yy\"}"); runner.setProperty(StandardizeDateProperties.TIMEZONE, "CST"); + runner.setValidateExpressionUsage(false); runner.enqueue(unprocessedFile); diff --git a/nifi-standardize-date-processors/src/test/resources/processed.json b/nifi-standardize-date-processors/src/test/resources/processed.json index dc692e4..df2374c 100644 --- a/nifi-standardize-date-processors/src/test/resources/processed.json +++ b/nifi-standardize-date-processors/src/test/resources/processed.json @@ -1,2 +1,2 @@ -{"status":"active","bad_date":"10/28/18","bad_date_standardized":"2018-10-28 05:00:00.000","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"} -{"status":"active","bad_date":"11/11/05","bad_date_standardized":"2005-11-11 06:00:00.000","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"} +{"status":"active","bad_date":"10/28/18","bad_date_standardized":"2018-10-28 05:00:00.000","bad_date_union":{"string":"10/28/18"},"location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"} +{"status":"active","bad_date":"11/11/05","bad_date_standardized":"2005-11-11 06:00:00.000","bad_date_union":null,"location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"} diff --git a/nifi-standardize-date-processors/src/test/resources/unprocessed.avro b/nifi-standardize-date-processors/src/test/resources/unprocessed.avro index 5b57378..7fd2887 100644 Binary files a/nifi-standardize-date-processors/src/test/resources/unprocessed.avro and b/nifi-standardize-date-processors/src/test/resources/unprocessed.avro differ diff --git a/nifi-standardize-date-processors/src/test/resources/unprocessed.avsc b/nifi-standardize-date-processors/src/test/resources/unprocessed.avsc index 0572f4f..1fd932d 100644 --- a/nifi-standardize-date-processors/src/test/resources/unprocessed.avsc +++ b/nifi-standardize-date-processors/src/test/resources/unprocessed.avsc @@ -7,6 +7,9 @@ }, { "name" : "bad_date", "type" : "string" + }, { + "name" : "bad_date_union", + "type" : ["null","string"] }, { "name" : "location", "type" : { diff --git a/nifi-standardize-date-processors/src/test/resources/unprocessed.json b/nifi-standardize-date-processors/src/test/resources/unprocessed.json index 8684d37..04d96fe 100644 --- a/nifi-standardize-date-processors/src/test/resources/unprocessed.json +++ b/nifi-standardize-date-processors/src/test/resources/unprocessed.json @@ -1,2 +1,2 @@ -{"status":"active","bad_date":"10/28/18","location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"} -{"status":"active","bad_date":"11/11/05","location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"} \ No newline at end of file +{"status":"active","bad_date":"10/28/18","bad_date_union":{"string":"10/28/18"},"location":{"state":"CA","country":"US"},"first_name":"Catherine ","last_name":"Easton","title":"Ms","created_at":"2015-09-03T01:23:20.605354","updated_at":"2016-09-18T20:05:20.747376","entity":"bed56c6310c6497b8c456b9244c2a427","position":"Chair, BILETA ","id":"ffabd37094c24626a6901a03799c35d2","card_number":"1234567891011123"} +{"status":"active","bad_date":"11/11/05","bad_date_union":null,"location":{"state":"MO","country":"US"},"first_name":"James ","last_name":"Smith","title":"Mr","created_at":"2016-03-11T11:11:11.986462","updated_at":"2017-12-10T10:10:10.9047382","entity":"klo36c6310c6497b8c456b9244c0pl4n","position":"Chair, BILETA ","id":"ud96d37094c24626a6901a03798jnm5g","card_number":"9876543210987654"} \ No newline at end of file diff --git a/pom.xml b/pom.xml index ebea0da..01719f3 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ com.nineteen04labs nifi-standardize-date-bundle - 18.07.2 + 18.07.3 pom