From 665f7117961318e9ef9bfe5d9f4d9be052ec508c Mon Sep 17 00:00:00 2001 From: SowmyaDixit Date: Fri, 6 Dec 2019 19:37:04 +0530 Subject: [PATCH] Issue #0000 refactor: Fix secor process failure for empty message --- .../pinterest/secor/consumer/Consumer.java | 4 +- .../parser/PatternDateMessageParser.java | 62 +++++++++++-------- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 8ec3dbe4e..71428adb8 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -153,7 +153,7 @@ protected boolean consumeNextMessage() { if (mUnparsableMessages > MAX_UNPARSABLE_MESSAGES) { throw new RuntimeException("Failed to parse message " + rawMessage, e); } - LOG.warn("Failed to parse message {}", rawMessage, e); + LOG.warn("Consumer: Failed to parse message {}", rawMessage, e); } if (parsedMessage != null) { @@ -163,7 +163,7 @@ protected boolean consumeNextMessage() { mMetricCollector.metric("consumer.message_size_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); mMetricCollector.increment("consumer.throughput_bytes", rawMessage.getPayload().length, rawMessage.getTopic()); } catch (Exception e) { - throw new RuntimeException("Failed to write message " + parsedMessage, e); + throw new RuntimeException("Consumer: Failed to write message " + parsedMessage, e); } } } diff --git a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java index 1c551ccf8..1bba9254d 100644 --- a/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java +++ b/src/main/java/com/pinterest/secor/parser/PatternDateMessageParser.java @@ -73,43 +73,53 @@ public PatternDateMessageParser(SecorConfig config) { @Override public String[] extractPartitions(Message message) { - JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); boolean prefixEnabled = mConfig.isPartitionPrefixEnabled(); - String result[] = { prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate }; - if (jsonObject != null) { - Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); - if (fieldValue == null) - fieldValue = jsonObject.get(mConfig.getFallbackMessageTimestampName()); - if (fieldValue == null) - fieldValue = System.currentTimeMillis(); + String result[] = {prefixEnabled ? partitionPrefixMap.get("DEFAULT") + defaultDate : defaultDate}; + try { + JSONObject jsonObject = (JSONObject) JSONValue.parse(message.getPayload()); + if (jsonObject != null) { + Object fieldValue = jsonObject.get(mConfig.getMessageTimestampName()); + if (fieldValue == null) + fieldValue = jsonObject.get(mConfig.getFallbackMessageTimestampName()); + if (fieldValue == null) + fieldValue = System.currentTimeMillis(); - Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier()); - Object inputPattern = mConfig.getMessageTimestampInputPattern(); - if (inputPattern != null) { - try { + Object eventValue = jsonObject.get(mConfig.getPartitionPrefixIdentifier()); + Object inputPattern = mConfig.getMessageTimestampInputPattern(); + if (inputPattern != null) { + try { /* SimpleDateFormat outputFormatter = new SimpleDateFormat( StringUtils.defaultIfBlank(mConfig.getPartitionOutputDtFormat(), defaultFormatter)); */ - Date dateFormat; - if (fieldValue instanceof Number) { - dateFormat = new Date(((Number) fieldValue).longValue()); - } else { - SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString()); - inputFormatter.setTimeZone(messageTimeZone); - dateFormat = inputFormatter.parse(fieldValue.toString()); + Date dateFormat; + if (fieldValue instanceof Number) { + dateFormat = new Date(((Number) fieldValue).longValue()); + } else { + SimpleDateFormat inputFormatter = new SimpleDateFormat(inputPattern.toString()); + inputFormatter.setTimeZone(messageTimeZone); + dateFormat = inputFormatter.parse(fieldValue.toString()); + } + result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat) + : outputFormatter.format(dateFormat); + return result; + } catch (Exception e) { + e.printStackTrace(); + LOG.info("PatternDateMessageParser: Exception while parsing date " + e.getMessage()); + LOG.warn("Unable to get path: " + e.getMessage()); } - result[0] = prefixEnabled ? getPrefix(eventValue.toString()) + outputFormatter.format(dateFormat) - : outputFormatter.format(dateFormat); - return result; - } catch (Exception e) { - e.printStackTrace(); - LOG.warn("Unable to get path: " + e.getMessage()); } + } else { + LOG.info("PatternDateMessageParser: Unable to parse json object " + jsonObject); } + return result; + } + catch (Exception e) { + e.printStackTrace(); + LOG.info("PatternDateMessageParser: Exception while parsing date " + e.getMessage()); + return result; } - return result; } private String getPrefix(String prefixIdentifier) {