diff --git a/kite-morphlines/kite-morphlines-core/src/main/java/org/kitesdk/morphline/stdlib/ConvertTimestampBuilder.java b/kite-morphlines/kite-morphlines-core/src/main/java/org/kitesdk/morphline/stdlib/ConvertTimestampBuilder.java index ef963fbfc3..53ded3c1fc 100644 --- a/kite-morphlines/kite-morphlines-core/src/main/java/org/kitesdk/morphline/stdlib/ConvertTimestampBuilder.java +++ b/kite-morphlines/kite-morphlines-core/src/main/java/org/kitesdk/morphline/stdlib/ConvertTimestampBuilder.java @@ -39,6 +39,8 @@ import com.google.common.base.Joiner; import com.typesafe.config.Config; +import javafx.util.Pair; + /** * Command that converts the timestamps in a given field from one of a set of input date formats (in * an input timezone) to an output date format (in an output timezone), while respecting daylight @@ -60,12 +62,14 @@ public Command build(Config config, Command parent, Command child, MorphlineCont /////////////////////////////////////////////////////////////////////////////// // Nested classes: /////////////////////////////////////////////////////////////////////////////// - private static final class ConvertTimestamp extends AbstractCommand { + public static final class ConvertTimestamp extends AbstractCommand { private final String fieldName; - private final List inputFormats = new ArrayList(); + private final List> inputFormats = new ArrayList>(); private final SimpleDateFormat outputFormat; private final String inputFormatsDebugString; // cached + private final int insertYearMonthOffset; + private final int insertYearOffset; private static final String NATIVE_SOLR_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; // e.g. 2007-04-26T08:05:04.789Z private static final SimpleDateFormat UNIX_TIME_IN_MILLIS = new SimpleDateFormat("'unixTimeInMillis'"); @@ -75,20 +79,31 @@ private static final class ConvertTimestamp extends AbstractCommand { DateUtil.DEFAULT_DATE_FORMATS.add(0, NATIVE_SOLR_FORMAT); } - public ConvertTimestamp(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) { + ConvertTimestamp(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) { super(builder, config, parent, child, context); this.fieldName = getConfigs().getString(config, "field", Fields.TIMESTAMP); TimeZone inputTimeZone = getTimeZone(getConfigs().getString(config, "inputTimezone", "UTC")); Locale inputLocale = getLocale(getConfigs().getString(config, "inputLocale", "")); + + boolean insertYear = getConfigs().getBoolean(config, "insertMissingYear", false); + //Defaults to -5 which gives a rolling -11 through +1 month offset (assumes historic messages) + insertYearMonthOffset = getConfigs().getInt(config, "insertMissingYearMonthOffset", -5); + insertYearOffset = getConfigs().getInt(config,"insertMissingYearOffset", 0); + for (String inputFormat : getConfigs().getStringList(config, "inputFormats", DateUtil.DEFAULT_DATE_FORMATS)) { SimpleDateFormat dateFormat = getUnixTimeFormat(inputFormat, inputTimeZone); + boolean yearRequired = false; if (dateFormat == null) { + if (insertYear && !inputFormat.contains("yy")) { + inputFormat = "yyyy" + inputFormat; + yearRequired=true; + } dateFormat = new SimpleDateFormat(inputFormat, inputLocale); dateFormat.setTimeZone(inputTimeZone); dateFormat.set2DigitYearStart(DateUtil.DEFAULT_TWO_DIGIT_YEAR_START); } - this.inputFormats.add(dateFormat); + this.inputFormats.add(new Pair(dateFormat,yearRequired)); } TimeZone outputTimeZone = getTimeZone(getConfigs().getString(config, "outputTimezone", "UTC")); Locale outputLocale = getLocale(getConfigs().getString(config, "outputLocale", "")); @@ -99,19 +114,23 @@ public ConvertTimestamp(CommandBuilder builder, Config config, Command parent, C dateFormat.setTimeZone(outputTimeZone); } this.outputFormat = dateFormat; - validateArguments(); - + List inputFormatsStringList = new ArrayList(); - for (SimpleDateFormat inputFormat : inputFormats) { + for (Pair inputFormat : inputFormats) { // SimpleDateFormat.toString() doesn't print anything useful - inputFormatsStringList.add(inputFormat.toPattern()); + inputFormatsStringList.add(inputFormat.getKey().toPattern()); } this.inputFormatsDebugString = inputFormatsStringList.toString(); + + validateArguments(); + + if (LOG.isTraceEnabled()) { LOG.trace("inputFormatsDebugString: {}", inputFormatsDebugString); LOG.trace("availableTimeZoneIDs: {}", Joiner.on("\n").join(TimeZone.getAvailableIDs())); LOG.trace("availableLocales: {}", Joiner.on("\n").join(Locale.getAvailableLocales())); + LOG.trace("insertMissingYear: {}", insertYear); } } @@ -123,7 +142,9 @@ protected boolean doProcess(Record record) { while (iter.hasNext()) { String timestamp = iter.next().toString(); boolean foundMatchingFormat = false; - for (SimpleDateFormat inputFormat : inputFormats) { + for (Pair inputFormatPair : inputFormats) { + SimpleDateFormat inputFormat = inputFormatPair.getKey(); + boolean yearRequired = inputFormatPair.getValue(); Date date; boolean isUnixTime; if (inputFormat == UNIX_TIME_IN_MILLIS) { @@ -135,7 +156,15 @@ protected boolean doProcess(Record record) { } else { isUnixTime = false; pos.setIndex(0); - date = inputFormat.parse(timestamp, pos); + if (yearRequired) { + Calendar cal = Calendar.getInstance(); + int targetYear = cal.get(Calendar.YEAR) + insertYearOffset; + timestamp = targetYear + timestamp; + date = inputFormat.parse(timestamp, pos); + date = DateUtil.insertYear(date, new Date(), insertYearMonthOffset, targetYear, inputFormat.getTimeZone()); + } else { + date = inputFormat.parse(timestamp, pos); + } } if (date != null && (isUnixTime || pos.getIndex() == timestamp.length())) { String result; @@ -208,6 +237,7 @@ private Locale getLocale(String name) { } + /////////////////////////////////////////////////////////////////////////////// // Nested classes: /////////////////////////////////////////////////////////////////////////////// @@ -230,7 +260,7 @@ private Locale getLocale(String name) { /** * This class has some code from HttpClient DateUtil and Solrj DateUtil. */ - private static final class DateUtil { + public static final class DateUtil { //start HttpClient /** * Date format pattern used to parse HTTP date headers in RFC 1123 format. @@ -259,8 +289,6 @@ private static final class DateUtil { DEFAULT_TWO_DIGIT_YEAR_START = calendar.getTime(); } -// private static final TimeZone GMT = TimeZone.getTimeZone("GMT"); - //end HttpClient //--------------------------------------------------------------------------------------- @@ -280,7 +308,56 @@ private static final class DateUtil { DEFAULT_DATE_FORMATS.addAll(DateUtil.DEFAULT_HTTP_CLIENT_PATTERNS); } + //work around the fact that SimpleDateFormat doesn't handle missing year. + //Code inspired by Flume SyslogParser.java + //https://github.com/apache/flume/blob/trunk/flume-ng-core/src/main/java/org/apache/flume/source/SyslogParser.java + public static Date insertYear(Date inputDate, Date currentDate, int monthOffset, int targetYear, TimeZone tz) { + Calendar cal = Calendar.getInstance(); + cal.setTimeZone(tz); + cal.setTime(inputDate); + + //There are 12 months in a year. We offer a sliding window, for working out whether the parsed date falls within + //the window (for dealing with year rollover issues). + //Compute the upper and lower bound by moving +6 and -6 by the offset. + int upperBound = monthOffset + 6; + int lowerBound = monthOffset - 6; + + //We're now going to check to see whether the date falls outside of the + //upper or lower bounds by intentionally creating the wrong date and seeing + //whether that falls in the past (or future) + Calendar calMinusUpperBMonths = Calendar.getInstance(); + calMinusUpperBMonths.setTime(inputDate); + calMinusUpperBMonths.set(Calendar.YEAR, targetYear); + calMinusUpperBMonths.add(Calendar.MONTH, upperBound * -1); + + Calendar calPlusLowerBMonths = Calendar.getInstance(); + calPlusLowerBMonths.setTime(inputDate); + calPlusLowerBMonths.set(Calendar.YEAR, targetYear); + calPlusLowerBMonths.add(Calendar.MONTH, lowerBound * -1); + + Calendar calReferencePoint = Calendar.getInstance(); + calReferencePoint.setTime(currentDate); + calReferencePoint.setTimeZone(tz); + calReferencePoint.set(Calendar.YEAR, targetYear); + + if (cal.getTimeInMillis() > calReferencePoint.getTimeInMillis() && + calMinusUpperBMonths.getTimeInMillis() > calReferencePoint.getTimeInMillis()) { + //Date as is stands is in the future and also more than (upper bound) months in the future, therefore rolling back a year. + //Need to roll back a year + cal.add(Calendar.YEAR, -1); + } else if (cal.getTimeInMillis() < calReferencePoint.getTimeInMillis() && + calPlusLowerBMonths.getTimeInMillis() < calReferencePoint.getTimeInMillis() ) { + //Date as it stands is in the past and indeed more than (lower bound) months in the past + //Need to roll forward a year + cal.add(Calendar.YEAR, -1); + } + // Else it's in the middle and no modification required + + return cal.getTime(); + + } + } } - -} + +} \ No newline at end of file diff --git a/kite-morphlines/kite-morphlines-core/src/test/java/org/kitesdk/morphline/stdlib/ConvertTimestampInsertYearTest.java b/kite-morphlines/kite-morphlines-core/src/test/java/org/kitesdk/morphline/stdlib/ConvertTimestampInsertYearTest.java new file mode 100644 index 0000000000..c2a8258141 --- /dev/null +++ b/kite-morphlines/kite-morphlines-core/src/test/java/org/kitesdk/morphline/stdlib/ConvertTimestampInsertYearTest.java @@ -0,0 +1,243 @@ +/* + * Copyright 2016 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.morphline.stdlib; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.stdlib.ConvertTimestampBuilder.ConvertTimestamp.DateUtil; +import org.junit.Test; +import org.kitesdk.morphline.api.AbstractMorphlineTest; +import org.kitesdk.morphline.api.ExceptionHandler; +import org.kitesdk.morphline.api.MorphlineContext; +import com.codahale.metrics.Counter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.health.HealthCheck; +import com.codahale.metrics.health.HealthCheckRegistry; + +public class ConvertTimestampInsertYearTest extends AbstractMorphlineTest { + + private void processAndVerifySuccess(Record input, Record expected) { + processAndVerifySuccess(input, expected, true); + } + + private void processAndVerifySuccess(Record input, Record expected, boolean isSame) { + collector.reset(); + startSession(); + assertEquals(1, collector.getNumStartEvents()); + assertTrue(morphline.process(input)); + assertEquals(expected, collector.getFirstRecord()); + if (isSame) { + assertSame(input, collector.getFirstRecord()); + } else { + assertNotSame(input, collector.getFirstRecord()); + } + } + + @Test + public void testMorphlineContext() throws Exception { + ExceptionHandler ex = new ExceptionHandler() { + @Override + public void handleException(Throwable t, Record record) { + throw new RuntimeException(t); + } + }; + + MetricRegistry metricRegistry = new MetricRegistry(); + metricRegistry.register("myCounter", new Counter()); + + HealthCheckRegistry healthChecks = new HealthCheckRegistry(); + healthChecks.register("foo", new HealthCheck() { + @Override + protected Result check() throws Exception { + return Result.healthy("flawless"); + } + }); + + Map settings = new HashMap(); + + MorphlineContext ctx = new MorphlineContext.Builder() + .setSettings(settings) + .setExceptionHandler(ex) + .setHealthCheckRegistry(healthChecks) + .setMetricRegistry(metricRegistry) + .build(); + + assertSame(settings, ctx.getSettings()); + assertSame(ex, ctx.getExceptionHandler()); + assertSame(metricRegistry, ctx.getMetricRegistry()); + assertSame(healthChecks, ctx.getHealthCheckRegistry()); + ctx.getHealthCheckRegistry().runHealthChecks(); + + assertEquals(0, new MorphlineContext.Builder().build().getSettings().size()); + } + + + @Test + public void testConvertTimestampWithMissingYear() throws Exception { + //Testing two configurations (ts1 and ts2), the first uses the missing year offset of 0 + //The second uses the missing year offset of -5 (11 months in the past, 1 month in the future) + //Loop through 12 months, build ts1 and ts2 and test that Morphline succesfully infers the correct year. + for (int i=-5; i<=6; i++) { + SimpleDateFormat inputFormat = new SimpleDateFormat("MMM dd HH:mm:ss"); + SimpleDateFormat outputFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + + Date date = new Date(System.currentTimeMillis()); + Calendar calTs1 = Calendar.getInstance(); + calTs1.setTime(date); + calTs1.setTimeZone(TimeZone.getTimeZone("UTC")); + calTs1.add(Calendar.MONTH, i); + + Calendar calTs2 = Calendar.getInstance(); + calTs2.setTime(date); + calTs2.setTimeZone(TimeZone.getTimeZone("UTC")); + calTs2.add(Calendar.MONTH, i-5); + + Calendar calTs3 = Calendar.getInstance(); + calTs3.setTime(date); + calTs3.setTimeZone(TimeZone.getTimeZone("UTC")); + calTs3.add(Calendar.MONTH, i); + calTs3.add(Calendar.YEAR, -5); + + //Small tweak to avoid the 1 month in the future ticking over by a few seconds between now + //and when the insertYear method actually runs + if (i==6) { + calTs1.add(Calendar.DAY_OF_MONTH, -1); + calTs2.add(Calendar.DAY_OF_MONTH, -1); + calTs3.add(Calendar.DAY_OF_MONTH, -1); + } + + morphline = createMorphline("test-morphlines/convertTimestampInsertYear"); + + Record record = new Record(); + record.put("ts1", inputFormat.format(calTs1.getTime())); + record.put("ts2", inputFormat.format(calTs2.getTime())); + record.put("ts3", inputFormat.format(calTs3.getTime())); + + Record expected = new Record(); + expected.put("ts1", outputFormat.format(calTs1.getTime())); + expected.put("ts2", outputFormat.format(calTs2.getTime())); + expected.put("ts3", outputFormat.format(calTs3.getTime())); + + processAndVerifySuccess(record, expected); + + } + } + + @Test + public void testConvertTimestamp() throws Exception { + morphline = createMorphline("test-morphlines/convertTimestamp"); + Record record = new Record(); + record.put("ts1", "2011-09-06T14:14:34.789Z"); // "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" + record.put("ts1", "2012-09-06T14:14:34"); + record.put("ts1", "2013-09-06"); + Record expected = new Record(); + expected.put("ts1", "2011-09-06T07:14:34.789-0700"); + expected.put("ts1", "2012-09-06T07:14:34.000-0700"); + expected.put("ts1", "2013-09-05T17:00:00.000-0700"); + processAndVerifySuccess(record, expected); + } + + @Test + public void testInsertYear() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yyyy HH:mm:ssZZZ"); + + Date inputDate; + Date currentDate; + Date expectedDate; + Date outputDate; + int targetYear; + + //Test year rollover scenario just after rollover + inputDate = sdf.parse("01/01/2011 01:02:03UTC"); + currentDate = sdf.parse("01/01/2011 01:02:03UTC"); + expectedDate = sdf.parse("01/01/2011 01:02:03UTC"); + targetYear = 2011; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test year rollover scenario where date is just before Jan 1st + inputDate = sdf.parse("31/12/2011 23:02:03UTC"); + currentDate = sdf.parse("01/01/2011 01:02:03UTC"); + expectedDate = sdf.parse("31/12/2010 23:02:03UTC"); + targetYear = 2011; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test year rollover scenario where system clock is slow + inputDate = sdf.parse("01/01/2011 01:02:03UTC"); + currentDate = sdf.parse("31/12/2010 23:02:03UTC"); + expectedDate = sdf.parse("01/01/2011 01:02:03UTC"); + targetYear = 2010; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test year rollover scenario just before rollover + inputDate = sdf.parse("31/12/2010 23:02:03UTC"); + currentDate = sdf.parse("31/12/2010 23:02:03UTC"); + expectedDate = sdf.parse("31/12/2010 23:02:03UTC"); + targetYear = 2010; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test mid year + inputDate = sdf.parse("31/6/2010 23:02:03UTC"); + currentDate = sdf.parse("31/6/2010 23:02:03UTC"); + expectedDate = sdf.parse("31/6/2010 23:02:03UTC"); + targetYear = 2010; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test mid year with lagging message + inputDate = sdf.parse("31/8/2010 23:02:03UTC"); + currentDate = sdf.parse("31/6/2010 23:02:03UTC"); + expectedDate = sdf.parse("31/8/2009 23:02:03UTC"); + targetYear = 2010; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test mid year with future message + inputDate = sdf.parse("21/7/2010 23:02:03UTC"); + currentDate = sdf.parse("31/6/2010 23:02:03UTC"); + expectedDate = sdf.parse("21/7/2010 23:02:03UTC"); + targetYear = 2010; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + //Test leap year scenario + inputDate = sdf.parse("29/2/2016 23:02:03UTC"); + currentDate = sdf.parse("29/2/2016 23:02:03UTC"); + expectedDate = sdf.parse("29/2/2016 23:02:03UTC"); + targetYear = 2016; + outputDate = DateUtil.insertYear(inputDate, currentDate, -5, targetYear, TimeZone.getTimeZone("UTC")); + + assertEquals(expectedDate.toString(), outputDate.toString()); + + } + +} diff --git a/kite-morphlines/kite-morphlines-core/src/test/resources/test-morphlines/convertTimestampInsertYear.conf b/kite-morphlines/kite-morphlines-core/src/test/resources/test-morphlines/convertTimestampInsertYear.conf new file mode 100644 index 0000000000..02ef3426c2 --- /dev/null +++ b/kite-morphlines/kite-morphlines-core/src/test/resources/test-morphlines/convertTimestampInsertYear.conf @@ -0,0 +1,61 @@ +# Copyright 2013 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +morphlines : [ + { + id : morphline1 + importCommands : ["org.kitesdk.**"] + + commands : [ + { logDebug { format : "input record: {}", args : ["@{}"] } } + { + convertTimestamp { + field : ts1 + inputFormats : ["MMM dd HH:mm:ss"] + inputTimezone : UTC + outputFormat : "yyyy-MM-dd'T'HH:mm:ss" + outputTimezone : UTC + insertMissingYear : true + insertMissingYearMonthOffset : 0 + } + } + + { + convertTimestamp { + field : ts2 + inputFormats : ["MMM dd HH:mm:ss"] + inputTimezone : UTC + outputFormat : "yyyy-MM-dd'T'HH:mm:ss" + outputTimezone : UTC + insertMissingYear : true + } + } + + { + convertTimestamp { + field : ts3 + inputFormats : ["MMM dd HH:mm:ss"] + inputTimezone : UTC + outputFormat : "yyyy-MM-dd'T'HH:mm:ss" + outputTimezone : UTC + insertMissingYear : true + insertMissingYearMonthOffset : 0 + insertMissingYearOffset : -5 + } + } + + { logDebug { format : "output record: {}", args : ["@{}"] } } + ] + } +]