From acc5e716bb7331e4e4249bd3bd04d074ee7a3bd0 Mon Sep 17 00:00:00 2001 From: Benoit Orihuela Date: Sat, 21 Sep 2024 08:05:49 +0200 Subject: [PATCH 1/5] Update README.md --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index ffdf914..b639208 100644 --- a/README.md +++ b/README.md @@ -83,7 +83,6 @@ When set to `true` a single email with grouped errors will be sent. When set to ## Todo - Optional inclusion of FlowFile contents. -- Add batching support in email provenance reporter. - Add testing. ## License From 766d8373a598ef56332d8c9a34ebff2686ab8e19 Mon Sep 17 00:00:00 2001 From: Ranim Naimi <156652078+ranim-n@users.noreply.github.com> Date: Fri, 27 Sep 2024 10:31:09 +0200 Subject: [PATCH 2/5] fix: sending an individual email when having one grouped error (#71) --- .../java/io/egm/nifi/reporting/EmailProvenanceReporter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java index cbf21e7..55535f1 100644 --- a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java +++ b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/EmailProvenanceReporter.java @@ -328,7 +328,7 @@ private String composeMessageContent(final Map event, Boolean gr .append("\tProcessor type: ").append(event.get("component_type")).append("\n") .append("\tProcess group: ").append(event.get("process_group_name")).append("\n"); - if (groupSimilarErrors) { + if (groupSimilarErrors && groupedEventsSize > 1) { message.append("\tTotal similar errors : ").append(groupedEventsSize).append("\n"); } @@ -417,7 +417,7 @@ public void sendErrorEmail(Map event, ReportingContext context, emailSubjectBuilder.append("[").append(subjectPrefix).append("] "); } - if (groupSimilarErrors) { + if (groupSimilarErrors && groupedEventsSize > 1) { emailSubjectBuilder.append(groupedEventsSize).append(" errors occurred in processor ") .append(event.get("component_name")).append(" in process group ") .append(event.get("process_group_name")); From da5dd8917f3d6a80a5285af99119edcbcf56db94 Mon Sep 17 00:00:00 2001 From: Benoit Orihuela Date: Mon, 14 Oct 2024 17:55:49 +0200 Subject: [PATCH 3/5] feat: add event id in ES fields --- .../io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java index 21f04ac..9b13528 100644 --- a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java +++ b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java @@ -101,14 +101,15 @@ public void indexEvents(final List> events, final ReportingC } Map preparedEvent = new HashMap<>(); + preparedEvent.put("event_id", event.get("event_id")); preparedEvent.put("event_time_millis", event.get("event_time")); preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc")); + preparedEvent.put("event_type", event.get("event_type")); preparedEvent.put("component_type", event.get("component_type")); preparedEvent.put("component_url", event.get("component_url")); preparedEvent.put("component_name", event.get("component_name")); preparedEvent.put("process_group_name", event.get("process_group_name")); preparedEvent.put("process_group_id", event.get("process_group_id")); - preparedEvent.put("event_type", event.get("event_type")); preparedEvent.put("status", event.get("status")); preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri")); preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri")); From 0569290539db32649160c4d2863d580715cdfb38 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 28 Oct 2024 16:29:32 +0100 Subject: [PATCH 4/5] chore(deps): bump com.fasterxml.jackson.core:jackson-databind (#73) Bumps [com.fasterxml.jackson.core:jackson-databind](https://github.com/FasterXML/jackson) from 2.17.2 to 2.18.0. - [Commits](https://github.com/FasterXML/jackson/commits) --- updated-dependencies: - dependency-name: com.fasterxml.jackson.core:jackson-databind dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- nifi-provenance-reporting-tasks/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nifi-provenance-reporting-tasks/pom.xml b/nifi-provenance-reporting-tasks/pom.xml index 17f20bc..bf34fad 100644 --- a/nifi-provenance-reporting-tasks/pom.xml +++ b/nifi-provenance-reporting-tasks/pom.xml @@ -52,7 +52,7 @@ com.fasterxml.jackson.core jackson-databind - 2.17.2 + 2.18.0 org.apache.nifi From 6431b6454e4f81b1778d8270c9b1b2cf73cc0280 Mon Sep 17 00:00:00 2001 From: Ranim Naimi <156652078+ranim-n@users.noreply.github.com> Date: Tue, 5 Nov 2024 11:35:53 +0100 Subject: [PATCH 5/5] feat: add property to filter provenance events by processors types (#78) * feat: add property to filter provenance events by processors types * add default values + some minor fixes * minor fix --- .../ElasticsearchProvenanceReporter.java | 102 ++++++++++++------ 1 file changed, 68 insertions(+), 34 deletions(-) diff --git a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java index 9b13528..8195f37 100644 --- a/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java +++ b/nifi-provenance-reporting-tasks/src/main/java/io/egm/nifi/reporting/ElasticsearchProvenanceReporter.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,6 +44,13 @@ @Tags({"elasticsearch", "provenance"}) @CapabilityDescription("A provenance reporting task that writes to Elasticsearch") public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter { + + static final List DEFAULT_PROCESSORS_TYPES_ALLOWLIST = Arrays.asList( + "DeleteSFTP", "ExecuteSQLRecord", "ExtendedValidateCsv", "FetchFTP", + "FetchSFTP", "FetchSmb", "GenerateFlowFile", "GetFTP", "GetSFTP", "GetSmbFile", "InvokeHTTP", "ListenFTP", + "ListFTP", "ListSFTP", "ListSmb", "PutFTP", "PutSFTP", "PutSmbFile" + ); + public static final PropertyDescriptor ELASTICSEARCH_URL = new PropertyDescriptor .Builder().name("Elasticsearch URL") .displayName("Elasticsearch URL") @@ -62,6 +70,15 @@ public class ElasticsearchProvenanceReporter extends AbstractProvenanceReporter .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor PROCESSORS_TYPES_ALLOWLIST = new PropertyDescriptor + .Builder().name("Processors Types Allowlist") + .displayName("Processors Types Allowlist") + .description("Specifies a comma-separated list of processors types for which all provenance events " + + "will be sent. If the processor type is not in the list, only error events will be sent.") + .defaultValue(String.join(",", DEFAULT_PROCESSORS_TYPES_ALLOWLIST)) + .addValidator(StandardValidators.createListValidator(true, true, StandardValidators.NON_BLANK_VALIDATOR)) + .build(); + private final Map esClients = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); @@ -85,6 +102,7 @@ public final List getSupportedPropertyDescriptors() { descriptors = super.getSupportedPropertyDescriptors(); descriptors.add(ELASTICSEARCH_URL); descriptors.add(ELASTICSEARCH_INDEX); + descriptors.add(PROCESSORS_TYPES_ALLOWLIST); return descriptors; } @@ -92,6 +110,9 @@ public void indexEvents(final List> events, final ReportingC final String elasticsearchUrl = context.getProperty(ELASTICSEARCH_URL).getValue(); final String elasticsearchIndex = context.getProperty(ELASTICSEARCH_INDEX).evaluateAttributeExpressions().getValue(); final ElasticsearchClient client = getElasticsearchClient(elasticsearchUrl); + final List processorTypesAllowlist = + Arrays.asList(context.getProperty(PROCESSORS_TYPES_ALLOWLIST).getValue().split(",")); + events.forEach(event -> { final String id = Long.toString((Long) event.get("event_id")); @@ -99,42 +120,55 @@ public void indexEvents(final List> events, final ReportingC getLogger().warn("Provenance event has no process group or processor, ignoring"); return; } - - Map preparedEvent = new HashMap<>(); - preparedEvent.put("event_id", event.get("event_id")); - preparedEvent.put("event_time_millis", event.get("event_time")); - preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc")); - preparedEvent.put("event_type", event.get("event_type")); - preparedEvent.put("component_type", event.get("component_type")); - preparedEvent.put("component_url", event.get("component_url")); - preparedEvent.put("component_name", event.get("component_name")); - preparedEvent.put("process_group_name", event.get("process_group_name")); - preparedEvent.put("process_group_id", event.get("process_group_id")); - preparedEvent.put("status", event.get("status")); - preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri")); - preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri")); - preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri")); - preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri")); - try { - preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes"))); - preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes"))); - } catch (JsonProcessingException e) { - getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e); + if(!event.containsKey("component_type")) { + getLogger().warn("Provenance event has no component type, ignoring"); + return; } - if (event.containsKey("details")) - preparedEvent.put("details", event.get("details")); - - final IndexRequest> indexRequest = new - IndexRequest.Builder>() - .index(elasticsearchIndex) - .id(id) - .document(preparedEvent) - .build(); - try { - client.index(indexRequest); - } catch (ElasticsearchException | IOException ex) { - getLogger().error("Error while indexing event {}", id, ex); + + final String componentType = event.get("component_type").toString(); + final String status = event.get("status").toString(); + + if(processorTypesAllowlist.contains(componentType)|| status.equals("Error")) { + + Map preparedEvent = new HashMap<>(); + preparedEvent.put("event_id", event.get("event_id")); + preparedEvent.put("event_time_millis", event.get("event_time")); + preparedEvent.put("event_time_iso_utc", event.get("event_time_iso_utc")); + preparedEvent.put("event_type", event.get("event_type")); + preparedEvent.put("component_type", event.get("component_type")); + preparedEvent.put("component_url", event.get("component_url")); + preparedEvent.put("component_name", event.get("component_name")); + preparedEvent.put("process_group_name", event.get("process_group_name")); + preparedEvent.put("process_group_id", event.get("process_group_id")); + preparedEvent.put("status", event.get("status")); + preparedEvent.put("download_input_content_uri", event.get("download_input_content_uri")); + preparedEvent.put("download_output_content_uri", event.get("download_output_content_uri")); + preparedEvent.put("view_input_content_uri", event.get("view_input_content_uri")); + preparedEvent.put("view_output_content_uri", event.get("view_output_content_uri")); + try { + preparedEvent.put("updated_attributes", objectMapper.writeValueAsString(event.get("updated_attributes"))); + preparedEvent.put("previous_attributes", objectMapper.writeValueAsString(event.get("previous_attributes"))); + } catch (JsonProcessingException e) { + getLogger().error("Error while writing value of previous or updated attributes, ignoring them", e); + } + if (event.containsKey("details")) + preparedEvent.put("details", event.get("details")); + + final IndexRequest> indexRequest = new + IndexRequest.Builder>() + .index(elasticsearchIndex) + .id(id) + .document(preparedEvent) + .build(); + try { + client.index(indexRequest); + } catch (ElasticsearchException | IOException ex) { + getLogger().error("Error while indexing event {}", id, ex); + } + } + + }); } }