diff --git a/README.md b/README.md index 3f2dd48..03c5dd0 100644 --- a/README.md +++ b/README.md @@ -18,14 +18,14 @@ gratum with a couple of beliefs about data transformations. For Gradle: - compile group: 'com.github.chubbard', name: 'gratum', version: '1.1.7' + compile group: 'com.github.chubbard', name: 'gratum', version: '1.1.8' For Maven: com.github.chubbard gratum - 1.1.7 + 1.1.8 ## Oh Shell Yeah! diff --git a/build.gradle b/build.gradle index bcc76a3..68da47e 100644 --- a/build.gradle +++ b/build.gradle @@ -9,8 +9,8 @@ targetCompatibility = 1.8 sourceCompatibility = 1.8 group = 'com.github.chubbard' -version = '1.1.7' -//version = '1.1.6-SNAPSHOT' +//version = '1.1.7' +version = '1.1.8' description = """ A simplified standalone ETL engine for groovy. Gratum is groovy + datum. """ @@ -24,7 +24,7 @@ dependencies { implementation group: 'commons-codec', name: 'commons-codec', version: '1.15' implementation group: 'commons-cli', name: 'commons-cli', version: '1.4' - implementation('org.codehaus.groovy:groovy-all:2.5.18') { + implementation('org.codehaus.groovy:groovy-all:2.5.23') { exclude group: 'org.codehaus.groovy', module:'groovy-swing' exclude group: 'org.codehaus.groovy', module:'groovy-testng' exclude group: 'org.codehaus.groovy', module:'groovy-console' diff --git a/src/main/groovy/gratum/concurrency/LocalConcurrentContext.groovy b/src/main/groovy/gratum/concurrency/LocalConcurrentContext.groovy index b2694a8..9e711a5 100644 --- a/src/main/groovy/gratum/concurrency/LocalConcurrentContext.groovy +++ b/src/main/groovy/gratum/concurrency/LocalConcurrentContext.groovy @@ -5,14 +5,15 @@ import gratum.etl.Pipeline import gratum.etl.Rejection import gratum.etl.RejectionCategory import gratum.source.ChainedSource -import gratum.source.ClosureSource import groovy.transform.CompileStatic +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.FromString import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit -@CompileStatic +//@CompileStatic public class LocalConcurrentContext implements ConcurrentContext { final int workerSize @@ -33,13 +34,19 @@ public class LocalConcurrentContext implements ConcurrentContext { latch = new CountDownLatch(workers+1) } - public LocalConcurrentContext spread(@DelegatesTo(LocalConcurrentContext) Closure workerClosure ) { + public LocalConcurrentContext spread( + @DelegatesTo(LocalConcurrentContext) + @ClosureParams(value = FromString, options = ["gratum.etl.Pipeline"]) + Closure workerClosure ) { this.workerClosure = workerClosure this.workerClosure.delegate = this return this } - public LocalConcurrentContext collect(@DelegatesTo(LocalConcurrentContext) Closure resultsClosure ) { + public LocalConcurrentContext collect( + @DelegatesTo(LocalConcurrentContext) + @ClosureParams(value = FromString, options = ["gratum.etl.Pipeline"]) + Closure resultsClosure ) { this.resultProcessorClosure = resultsClosure this.resultProcessorClosure.delegate = this return this @@ -49,7 +56,7 @@ public class LocalConcurrentContext implements ConcurrentContext { return { Pipeline pipeline -> createWorkers() createResultProcessor() - pipeline.addStep("Queue to Workers") { Map row -> + pipeline.addStep("Queue to Workers") { row -> eventQueue.put( row ) return row } @@ -85,7 +92,7 @@ public class LocalConcurrentContext implements ConcurrentContext { for( int i = 0; i < workerSize; i++ ) { workers << new PipelineWorker("Worker-${i+1}", { try { - Pipeline pipeline = ClosureSource.of({ Pipeline pipeline -> + Pipeline pipeline = Pipeline.create("Worker") { pipeline -> boolean done = false while (!done && !Thread.interrupted()) { Map row = eventQueue.poll() @@ -96,21 +103,21 @@ public class LocalConcurrentContext implements ConcurrentContext { pipeline.process(row) } } - }).name("Worker").into() - .onRejection { Pipeline rej -> - rej.addStep { Map row -> - // so when we play this down the rejections pipeline it'll expect a REJECT_KEY to be there so we recreate it - // because at this point the REJECTED_KEY property has been removed so we re-add it. Not great. - Rejection reject = new Rejection(row.rejectionReason as String, row.rejectionCategory as RejectionCategory, row.rejectionStep as String) - row[Pipeline.REJECTED_KEY] = reject - resultQueue.put(row) - return row - } - return - } + } + pipeline.onRejection { rej -> + rej.addStep("replaceRejectKey") { row -> + // so when we play this down the rejections pipeline it'll expect a REJECT_KEY to be there so we recreate it + // because at this point the REJECTED_KEY property has been removed so we re-add it. Not great. + Rejection reject = new Rejection(row["rejectionReason"] as String, row["rejectionCategory"] as RejectionCategory, row["rejectionStep"] as String) + row[Pipeline.REJECTED_KEY] = reject + resultQueue.put((Map)row) + return row + } + return + } - LoadStatistic stat = ((Pipeline) workerClosure(pipeline)) - .addStep("Queue to Results") { Map row -> + LoadStatistic stat = workerClosure.call(pipeline) + .addStep("Queue to Results") { row -> resultQueue.put(row) return row } @@ -127,7 +134,7 @@ public class LocalConcurrentContext implements ConcurrentContext { void createResultProcessor() { resultProcessor = new PipelineWorker( "Results Processor", { try { - Pipeline pipeline = ClosureSource.of({ Pipeline pipeline -> + Pipeline pipeline = Pipeline.create("Result Processor") { pipeline -> boolean done = false while (!done && !Thread.interrupted()) { Map row = resultQueue.poll(10, TimeUnit.SECONDS) @@ -141,7 +148,7 @@ public class LocalConcurrentContext implements ConcurrentContext { done = true } } - }).name("Result Processor").into() + } LoadStatistic stats = resultProcessorClosure.call(pipeline) .go() return stats diff --git a/src/main/groovy/gratum/etl/Pipeline.groovy b/src/main/groovy/gratum/etl/Pipeline.groovy index 4193716..7061b83 100644 --- a/src/main/groovy/gratum/etl/Pipeline.groovy +++ b/src/main/groovy/gratum/etl/Pipeline.groovy @@ -89,7 +89,10 @@ public class Pipeline { * passed to send a row into the pipeline. * @return The pipeline attached to results of the startClosure. */ - public static Pipeline create( String name, @DelegatesTo(Pipeline) Closure startClosure ) { + public static Pipeline create( String name, + @DelegatesTo(Pipeline) + @ClosureParams( value = FromString, options = ["gratum.etl.Pipeline"]) + Closure startClosure ) { ClosureSource.of( startClosure ).name( name ).into() } @@ -120,7 +123,10 @@ public class Pipeline { * @param step The code used to process each row on the Pipeline. * @return this Pipeline. */ - public Pipeline prependStep( String name = null, @DelegatesTo(Pipeline) Closure step ) { + public Pipeline prependStep( String name = null, + @DelegatesTo(Pipeline) + @ClosureParams(value = FromString, options = ["java.lang.Map"]) + Closure> step ) { step.delegate = this processChain.add(0, new Step( name, step ) ) return this @@ -136,13 +142,17 @@ public class Pipeline { * @param step The code used to process each row on the Pipeline. * @return this Pipeline. */ - public Pipeline addStep( String name = null, @DelegatesTo(Pipeline) Closure step ) { + public Pipeline addStep( String name = null, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure> step ) { step.delegate = this processChain << new Step( name, step ) return this } - public Pipeline addStep(GString name, @DelegatesTo(Pipeline) Closure> step) { + public Pipeline addStep(GString name, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure> step) { this.addStep( name.toString(), step ) } @@ -165,13 +175,15 @@ public class Pipeline { * @param branch Closure that's passed the rejection the pipeline * @return this Pipeline */ - public Pipeline onRejection( @DelegatesTo(Pipeline) Closure branch ) { + public Pipeline onRejection( @DelegatesTo(Pipeline) + @ClosureParams( value = FromString, options = ["gratum.etl.Pipeline"]) + Closure branch ) { if( parent ) { parent.onRejection( branch ) } else { if( !rejections ) rejections = new Pipeline("Rejections(${name})") - rejections.addStep("Remap rejections to columns") { Map row -> - Map current = (Map)row.clone() + rejections.addStep("Remap rejections to columns") { row -> + Map current = (Map)row.clone() Rejection rejection = (Rejection)current.remove(REJECTED_KEY) current.rejectionCategory = rejection.category current.rejectionReason = rejection.reason @@ -200,7 +212,7 @@ public class Pipeline { src.parent = this this.after { int line = 0 - src.addStep("concat(${src.name})") { Map row -> + src.addStep("concat(${src.name})") { row -> line++ original.process( row, line ) return row @@ -216,9 +228,11 @@ public class Pipeline { * @param callback A callback that is passed a row, and returns a boolean. All rows that return a false are rejected. * @return A Pipeline that contains only the rows that matched the filter. */ - public Pipeline filter(String name = "filter()", @DelegatesTo(Pipeline) Closure callback) { + public Pipeline filter(String name = "filter()", + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure callback) { callback.delegate = this - addStep( name ) { Map row -> + addStep( name ) { row -> if( !callback(row) ) { return reject(row,"Row did not match the filter closure.", RejectionCategory.IGNORE_ROW ) } @@ -260,7 +274,7 @@ public class Pipeline { */ public Pipeline filter( Map columns ) { Condition condition = new Condition( columns ) - addStep( "filter ${ condition }" ) { Map row -> + addStep( "filter ${ condition }" ) { row -> if( condition.matches(row) ) { return row } else { @@ -276,7 +290,7 @@ public class Pipeline { * @return Pipeline where all columns of each row has white space removed. */ public Pipeline trim() { - addStep("trim()") { Map row -> + addStep("trim()") { row -> row.each { String key, Object value -> row[key] = (value as String)?.trim() } return row } @@ -296,7 +310,7 @@ public class Pipeline { Pipeline tail = split( branch ) - addStep( "branch(${branchName})" ) { Map row -> + addStep( "branch(${branchName})" ) { row -> branch.process( new LinkedHashMap(row) ) return row } @@ -324,7 +338,7 @@ public class Pipeline { Pipeline tail = split(branch) Condition selection = new Condition( condition ) - addStep( "branch(${condition})" ) { Map row -> + addStep( "branch(${condition})" ) { row -> if( selection.matches( row )) { branch.process( new LinkedHashMap(row) ) } @@ -361,7 +375,7 @@ public class Pipeline { */ public Pipeline join( Pipeline other, def columns, boolean left = false ) { Map>> cache =[:] - other.addStep("join(${other.name}, ${columns}).cache") { Map row -> + other.addStep("join(${other.name}, ${columns}).cache") { row -> String key = keyOf(row, rightColumn(columns) ) if( !cache.containsKey(key) ) cache.put(key, []) cache[key] << row @@ -411,9 +425,12 @@ public class Pipeline { * @param decider a Closure which decides if the values from a prior row will be used to fill in missing values in the current row. * @return A Pipeline where the row's empty column values are filled in by the previous row. */ - public Pipeline fillDownBy( Closure decider ) { + public Pipeline fillDownBy( @DelegatesTo(Pipeline) + @ClosureParams( value = FromString, options = ["java.util.Map", "java.util.Map"]) + Closure decider ) { Map previousRow = null - addStep("fillDownBy()") { Map row -> + decider.delegate = this + addStep("fillDownBy()") { row -> if( previousRow && decider( row, previousRow ) ) { row.each { String col, Object value -> // todo refactor valid_to out for excluded @@ -435,7 +452,7 @@ public class Pipeline { * @return A Pipeline where all of the columns in the keys of the Map are renamed to the Map's corresponding values. */ public Pipeline renameFields( Map fieldNames ) { - addStep("renameFields(${fieldNames}") { Map row -> + addStep("renameFields(${fieldNames}") { row -> for( String src : fieldNames.keySet() ) { String dest = fieldNames.get( src ) row[dest] = row.remove( src ) @@ -461,14 +478,14 @@ public class Pipeline { public Pipeline intersect( Pipeline other, def columns ) { Map > cache = [:] - other.addStep("intersect(${other.name}, ${columns}).cache") { Map row -> + other.addStep("intersect(${other.name}, ${columns}).cache") { row -> String key = keyOf(row, rightColumn(columns) ) if( !cache.containsKey(key) ) cache.put(key, []) cache[key] << row return row }.start() - addStep("intersect(${this.name}, ${columns})") { Map row -> + addStep("intersect(${this.name}, ${columns})") { row -> String key = keyOf( row, leftColumn(columns) ) row.included = cache.containsKey(key) return row @@ -478,21 +495,21 @@ public class Pipeline { return this } - private Iterable leftColumn(def columns) { + private Iterable leftColumn(Object columns) { if( columns instanceof Collection ) { return ((Collection)columns) } else if( columns instanceof Map ) { - return ((Map)columns).keySet() + return ((Map)columns).keySet() } else { return [columns.toString()] } } - private Iterable rightColumn(def columns) { + private Iterable rightColumn(Object columns) { if( columns instanceof Collection ) { return ((Collection)columns) } else if( columns instanceof Map ) { - return ((Map)columns).values() + return ((Map)columns).values() } else { return [columns.toString()] } @@ -510,7 +527,7 @@ public class Pipeline { */ public Pipeline groupBy( String... columns ) { Map cache = [:] - addStep("groupBy(${columns.join(',')})") { Map row -> + addStep("groupBy(${columns.join(',')})") { row -> Map current = cache columns.eachWithIndex { String col, int i -> if( !current.containsKey(row[col]) ) { @@ -561,7 +578,7 @@ public class Pipeline { } List ordered = [] - addStep("sort(${columns})") { Map row -> + addStep("sort(${columns})") { row -> //int index = Collections.binarySearch( ordered, row, comparator ) //ordered.add( Math.abs(index + 1), row ) ordered << row @@ -585,7 +602,7 @@ public class Pipeline { * @return A Pipeline where all rows contains a java.lang.Double at the given column */ Pipeline asDouble(String column) { - addStep("asDouble(${column})") { Map row -> + addStep("asDouble(${column})") { row -> String value = row[column] as String try { if (value) row[column] = Double.parseDouble(value) @@ -602,7 +619,7 @@ public class Pipeline { * @return A Pipeline where all rows contain a java.lang.Integer at given column */ Pipeline asInt(String column) { - addStep("asInt(${column})") { Map row -> + addStep("asInt(${column})") { row -> String value = row[column] as String try { if( value ) row[column] = Integer.parseInt(value) @@ -619,7 +636,7 @@ public class Pipeline { * @return A Pipeline where all rows contain a java.lang.Boolean at given column */ Pipeline asBoolean(String column) { - addStep("asBoolean(${column}") { Map row -> + addStep("asBoolean(${column}") { row -> String value = row[column] if( value ) { switch( value ) { @@ -666,7 +683,7 @@ public class Pipeline { */ Pipeline asDate(String column, String... formats = ["yyyy-MM-dd"]) { List dateFormats = formats.collect { new SimpleDateFormat(it) } - addStep("asDate(${column}, ${formats})") { Map row -> + addStep("asDate(${column}, ${formats})") { row -> if(row[column] instanceof Date ) return row String val = row[column] as String if (val) { @@ -736,7 +753,7 @@ public class Pipeline { * @return this Pipeline */ public Pipeline printRow(String... columns) { - addStep("print()") { Map row -> + addStep("print()") { row -> if( columns ) { println( "[ ${columns.toList().collect { row[it] }.join(',')} ]" ) } else { @@ -749,7 +766,7 @@ public class Pipeline { public Pipeline progress( int col = 50 ) { int line = 1 - addStep("progress()") { Map row -> + addStep("progress()") { row -> line++ printf(".") if( line % col ) println() @@ -764,7 +781,7 @@ public class Pipeline { * @return The Pipeline where each row has a fieldname set to the given value */ public Pipeline setField(String fieldName, Object value ) { - addStep("setField(${fieldName})") { Map row -> + addStep("setField(${fieldName})") { row -> row[fieldName] = value return row } @@ -776,9 +793,11 @@ public class Pipeline { * @param fieldValue The closure that returns a value to set the given field's name to. * @return The Pipeline where the fieldname exists in every row */ - public Pipeline addField(String fieldName, @DelegatesTo(Pipeline) Closure fieldValue) { + public Pipeline addField(String fieldName, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure fieldValue) { fieldValue.delegate = this - addStep("addField(${fieldName})") { Map row -> + addStep("addField(${fieldName})") { row -> Object value = fieldValue(row) if( value instanceof Rejection ) { row[REJECTED_KEY] = value @@ -795,13 +814,15 @@ public class Pipeline { * which will always remove the fieldName if not provided. * * @param fieldName the name of the field to remove depending on what the optional closure returns - * @param removeLogic an optiona closure that when given can return true or false to indicate to remove + * @param removeLogic an optional closure that when given can return true or false to indicate to remove * the field or not. If not provided the field is always removed. * @return The pipeline where the fieldName has been removed when the removeLogic closure returns true or itself is null. */ - public Pipeline removeField(String fieldName, @DelegatesTo(Pipeline) Closure removeLogic = null) { + public Pipeline removeField(String fieldName, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure removeLogic = null) { removeLogic?.delegate = this - addStep( "removeField(${fieldName})") { Map row -> + addStep( "removeField(${fieldName})") { row -> if( removeLogic == null || removeLogic(row) ) { row.remove(fieldName) } @@ -817,7 +838,7 @@ public class Pipeline { * @return The pipeline where only the given columns are returned */ public Pipeline clip(String... columns) { - addStep( "clip(${columns.join(",")}") { Map row -> + addStep( "clip(${columns.join(",")}") { row -> Map result = [:] as Map for( String key : row.keySet() ) { if( columns.contains(key) ) { @@ -837,7 +858,7 @@ public class Pipeline { */ Pipeline unique(String column) { Set unique = [:] as HashSet - addStep("unique(${column})") { Map row -> + addStep("unique(${column})") { row -> if( unique.contains(row[column]) ) { return reject(row, "Non-unique row returned", RejectionCategory.IGNORE_ROW) } @@ -854,7 +875,9 @@ public class Pipeline { * @param closure Closure returns a an Iterable used to inject those rows into down stream steps. * @return Pipeline that will receive all members of the Iterable returned from the given closure. */ - public Pipeline inject(GString name, @DelegatesTo(Pipeline) Closure>> closure ) { + public Pipeline inject(GString name, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure>> closure ) { return this.inject( name.toString(), closure ) } @@ -866,11 +889,13 @@ public class Pipeline { * @param closure Takes a Map and returns a Collection<Map> that will be fed into the downstream steps * @return The Pipeline that will receive all members of the Iterable returned from the closure. */ - public Pipeline inject(String name, @DelegatesTo(Pipeline) Closure>> closure) { + public Pipeline inject(String name, + @ClosureParams( value = FromString, options = ["java.util.Map"]) + @DelegatesTo(Pipeline) Closure>> closure) { Pipeline next = new Pipeline(name, this) next.src = new ChainedSource( this ) closure.delegate = this - addStep(name) { Map row -> + addStep(name) { row -> Iterable> result = closure.call( row ) if( result == null ) { row = reject( row, "Unknown Reason", RejectionCategory.REJECTION ) @@ -899,12 +924,14 @@ public class Pipeline { * * @return A Pipeline whose records consist of the records from all Pipelines returned from the closure */ - public Pipeline exchange(Closure closure) { + public Pipeline exchange( @DelegatesTo(Pipeline) + @ClosureParams( value = FromString, options = ["java.util.Map"]) + Closure closure) { Pipeline next = new Pipeline( name, this ) next.src = new ChainedSource(this) - addStep("exchange(${next.name})") { Map row -> + addStep("exchange(${next.name})") { row -> Pipeline pipeline = closure( row ) - pipeline.addStep("Exchange Bridge(${pipeline.name})") { Map current -> + pipeline.addStep("Exchange Bridge(${pipeline.name})") { current -> next.process( current ) return current } @@ -919,7 +946,9 @@ public class Pipeline { * @param closure Takes a Map and returns a Collection<Map> that will be fed into the downstream steps * @return The Pipeline that will received all members of the Collection returned from the closure. */ - public Pipeline inject( @DelegatesTo(Pipeline) Closure closure) { + public Pipeline inject( @DelegatesTo(Pipeline) + @ClosureParams( value = FromString, options = ["java.util.Map"]) + Closure closure) { this.inject("inject()", closure ) } @@ -931,7 +960,7 @@ public class Pipeline { * column is empty. */ public Pipeline defaultValues( Map defaults ) { - this.addStep("defaultValues for ${defaults.keySet()}") { Map row -> + this.addStep("defaultValues for ${defaults.keySet()}") { row -> defaults.each { String column, Object value -> if( !row[column] ) row[column] = value } @@ -945,7 +974,7 @@ public class Pipeline { * @return A pipeline where the rows will have the destination columns set to the source column if empty/null. */ public Pipeline defaultsBy( Map defaults ) { - this.addStep("defaultsBy for ${defaults.keySet()}") { Map row -> + this.addStep("defaultsBy for ${defaults.keySet()}") { row -> defaults.each { String destColumn, String srcColumn -> if( !row[destColumn] ) row[destColumn] = row[srcColumn] } @@ -966,7 +995,7 @@ public class Pipeline { */ public Pipeline limit(long limit, boolean halt = true) { int current = 0 - this.addStep("Limit(${limit})") { Map row -> + this.addStep("Limit(${limit})") { row -> current++ if( current > limit ) { if( halt ) { @@ -986,7 +1015,8 @@ public class Pipeline { * @param The Closure * @return this Pipeline */ - public Pipeline apply(Closure applyToPipeline) { + public Pipeline apply( @ClosureParams( value = FromString, options = ["gratum.etl.Pipeline"]) + Closure applyToPipeline) { return applyToPipeline.call( this ) ?: this } @@ -1003,7 +1033,7 @@ public class Pipeline { * replaced with the given withClause */ public Pipeline replaceAll(String column, Pattern regEx, String withClause) { - addStep( "replaceAll(${column}, ${regEx.toString()})") { Map row -> + addStep( "replaceAll(${column}, ${regEx.toString()})") { row -> String v = row[column] row[column] = v?.replaceAll( regEx, withClause ) return row @@ -1020,7 +1050,7 @@ public class Pipeline { * @return this Pipeline */ public Pipeline replaceValues(String column, Map values ) { - addStep( "replaceValues(${column}, ${values})" ) { Map row -> + addStep( "replaceValues(${column}, ${values})" ) { row -> String v = row[column] if( values.containsKey(v) ) { row[column] = values[ v ] ?: row[column] @@ -1038,10 +1068,13 @@ public class Pipeline { * @param streamProperty The property that holds a stream object to be encrypted. * @param configure The Closure that is passed the PgpContext used to configure how the stream will be encrypted. */ - public Pipeline encryptPgp(String streamProperty, Closure configure ) { + public Pipeline encryptPgp(String streamProperty, + @ClosureParams( value = FromString, options = ["gratum.pgp.PgpContext"]) + @DelegatesTo(Pipeline) Closure configure ) { PgpContext pgp = new PgpContext() + configure.delegate = this configure.call( pgp ) - addStep("encrypt(${streamProperty})") { Map row -> + addStep("encrypt(${streamProperty})") { row -> File encryptedTemp = File.createTempFile("pgp-encrypted-output-${streamProperty}".toString(), ".gpg") InputStream stream = row[streamProperty] as InputStream try { @@ -1072,10 +1105,13 @@ public class Pipeline { * @param configure The closure called with a PgpContext object to further configure how it will decrypt the stream. * @return a Pipeline where the streamProperty contains decrypted stream. */ - public Pipeline decryptPgp(String streamProperty, Closure configure ) { + public Pipeline decryptPgp(String streamProperty, + @ClosureParams( value = FromString, options = ["gratum.pgp.PgpContext"]) + @DelegatesTo(Pipeline) Closure configure ) { PgpContext pgp = new PgpContext() + configure.delegate = this configure.call( pgp ) - addStep("decrypt(${streamProperty})") { Map row -> + addStep("decrypt(${streamProperty})") { row -> InputStream stream = row[streamProperty] as InputStream File decryptedFile = File.createTempFile("pgp-decrypted-output-${streamProperty}", "out") try { @@ -1126,7 +1162,7 @@ public class Pipeline { * @param lineNumber The lineNumber from the {@link gratum.source.Source} to use when tracking this row through the Pipeline */ public boolean process(Map row, int lineNumber = -1) { - Map next = row + Map next = row for (Step step : processChain) { next = step.execute( this, next, lineNumber ) if( next == null || next[REJECTED_KEY] ) return false @@ -1159,12 +1195,12 @@ public class Pipeline { return new Rejection( reason, category ) } - public static Map reject(Map row, String reason, RejectionCategory category = RejectionCategory.REJECTION ) { + public static Map reject(Map row, String reason, RejectionCategory category = RejectionCategory.REJECTION ) { row[ REJECTED_KEY ] = reject( reason, category ) return row } - void reject(Map row, int lineNumber = -1) { + void reject(Map row, int lineNumber = -1) { rejections?.process( row, lineNumber ) } diff --git a/src/main/groovy/gratum/source/ClosureSource.groovy b/src/main/groovy/gratum/source/ClosureSource.groovy index 29a95d8..274522d 100644 --- a/src/main/groovy/gratum/source/ClosureSource.groovy +++ b/src/main/groovy/gratum/source/ClosureSource.groovy @@ -2,6 +2,8 @@ package gratum.source import gratum.etl.Pipeline import groovy.transform.CompileStatic +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.FromString @CompileStatic class ClosureSource extends AbstractSource { @@ -12,7 +14,7 @@ class ClosureSource extends AbstractSource { this.logic = closure } - public static ClosureSource of(Closure closure) { + public static ClosureSource of(@ClosureParams( value = FromString, options = ["gratum.etl.Pipeline"]) Closure closure) { return new ClosureSource( closure ) } diff --git a/src/main/groovy/gratum/source/CsvSource.groovy b/src/main/groovy/gratum/source/CsvSource.groovy index 6086a12..89c6935 100644 --- a/src/main/groovy/gratum/source/CsvSource.groovy +++ b/src/main/groovy/gratum/source/CsvSource.groovy @@ -137,7 +137,7 @@ public class CsvSource extends AbstractSource { @Override boolean processRow(List header, List row) { - Map obj = [:] + Map obj = [:] for( int i = 0; i < row.size(); i++ ) { obj[header[i]] = row[i] } diff --git a/src/main/groovy/gratum/source/JdbcSource.groovy b/src/main/groovy/gratum/source/JdbcSource.groovy index 566e0f8..b31b572 100644 --- a/src/main/groovy/gratum/source/JdbcSource.groovy +++ b/src/main/groovy/gratum/source/JdbcSource.groovy @@ -56,7 +56,7 @@ class JdbcSource extends AbstractSource { columns << md.getColumnName(i) } } ) { GroovyResultSet row -> - Map result = [:] + Map result = [:] columns.eachWithIndex { String col, int index -> result[col] = row[index] } diff --git a/src/test/groovy/gratum/concurrency/LocalConcurrentContextTest.groovy b/src/test/groovy/gratum/concurrency/LocalConcurrentContextTest.groovy index 8554053..d923fe3 100644 --- a/src/test/groovy/gratum/concurrency/LocalConcurrentContextTest.groovy +++ b/src/test/groovy/gratum/concurrency/LocalConcurrentContextTest.groovy @@ -44,14 +44,14 @@ class LocalConcurrentContextTest { @Test void testRejections() { GratumFixture.withResource("titanic.csv") { stream -> - LoadStatistic stats = csv("titantic", stream, ",") + LoadStatistic stats = csv("titanic", stream, ",") .apply( context.spread { pipeline -> - pipeline.filter("Only Females") { Map row -> + pipeline.filter("Only Females") { row -> row.Sex == "female" } } .collect { Pipeline pipeline -> - pipeline.addStep("Assert we are on the results thread") { Map row -> + pipeline.addStep("Assert we are on the results thread") { row -> assert Thread.currentThread().name.startsWith("Results") return row } diff --git a/src/test/groovy/gratum/etl/GratumFixture.groovy b/src/test/groovy/gratum/etl/GratumFixture.groovy index d8db180..bf000bf 100644 --- a/src/test/groovy/gratum/etl/GratumFixture.groovy +++ b/src/test/groovy/gratum/etl/GratumFixture.groovy @@ -1,5 +1,8 @@ package gratum.etl +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.FromString + class GratumFixture { public static final List _people = [ [id: 1, name: 'Bill Rhodes', age: 53, gender: 'male', comment: """ @@ -45,7 +48,7 @@ I had the chili dog and the onion rings, but I wish you had tater tots. return getClass().getResourceAsStream("/" + resource ) } - public static void withResource(String resource, Closure callback) { + public static void withResource(String resource, @ClosureParams( value = FromString, options = ["java.io.InputStream"] ) Closure callback) { this.getResourceAsStream("/" + resource )?.withStream { InputStream stream -> callback.call( stream ) } diff --git a/src/test/groovy/gratum/etl/PipelineTest.groovy b/src/test/groovy/gratum/etl/PipelineTest.groovy index f0be76e..d7ee143 100644 --- a/src/test/groovy/gratum/etl/PipelineTest.groovy +++ b/src/test/groovy/gratum/etl/PipelineTest.groovy @@ -1,7 +1,6 @@ package gratum.etl import gratum.sink.Sink -import gratum.source.ClosureSource import gratum.source.CollectionSource import gratum.source.CsvSource import org.junit.Test @@ -90,8 +89,8 @@ class PipelineTest { LoadStatistic statistic = csv("src/test/resources/titanic.csv") .filter([Pclass: filter, Sex: "male"]) - .onRejection { Pipeline rej -> - rej.addStep("verify not sex != male && pClass != 3") { Map row -> + .onRejection { rej -> + rej.addStep("verify not sex != male && pClass != 3") { row -> assert !filter.contains(row.Pclass) || row.Sex != "male" return row } @@ -213,7 +212,7 @@ class PipelineTest { LoadStatistic statistic = csv("src/test/resources/titanic.csv") .filter([Sex: 'K']) .groupBy("Sex") - .addStep("Assert groupBy(Sex)") { Map row -> + .addStep("Assert groupBy(Sex)") { row -> assertTrue( row.isEmpty() ) return row } @@ -245,7 +244,7 @@ class PipelineTest { [name: 'Rob', atBats: '100', hits: '75', battingAverage: '0.75'], [name: 'Sean', atBats: '20', hits: 'none', battingAverage: 'none'] ])) - .addStep("Assert we get rows") { Map row -> + .addStep("Assert we get rows") { row -> called++ return row } @@ -264,7 +263,7 @@ class PipelineTest { [firstName: '\tRick\t', lastName: 'Spade '] ]) .trim() - .addStep("Assert all values were trimmed") { Map row -> + .addStep("Assert all values were trimmed") { row -> assert !row.firstName.contains(' ') assert !row.firstName.contains('\t') assert !row.lastName.contains(' ') @@ -280,7 +279,7 @@ class PipelineTest { void testSetField() { LoadStatistic stats = from( GratumFixture.people ) .setField("completed", true) - .addStep("Assert completed is defined") { Map row -> + .addStep("Assert completed is defined") { row -> assert row.completed return row } @@ -292,13 +291,13 @@ class PipelineTest { @Test void renameFields() { csv("src/test/resources/titanic.csv") - .addStep("Test Sex Exists") { Map row -> + .addStep("Test Sex Exists") { row -> assertTrue("Assert row.Sex exists", row.containsKey("Sex")) assertTrue("Assert row.Age exists", row.containsKey("Age")) return row } .renameFields([Sex: "gender", "Age": "age"]) - .addStep("Test Sex renamed to gender and Age to age") { Map row -> + .addStep("Test Sex renamed to gender and Age to age") { row -> assertTrue( row.containsKey("gender") ) assertTrue( row.containsKey("age") ) return row @@ -309,10 +308,10 @@ class PipelineTest { @Test void testAddField() { csv("src/test/resources/titanic.csv") - .addField("survived") { Map row -> + .addField("survived") { row -> return true } - .addStep("Test Field added") { Map row -> + .addStep("Test Field added") { row -> assertTrue( row.containsKey("survived") ) return row } @@ -323,7 +322,7 @@ class PipelineTest { void testFillDownBy() { int count = 0 csv("src/test/resources/fill_down.csv") - .addStep("Assert fields are missing data.") { Map row -> + .addStep("Assert fields are missing data.") { row -> if( !row.first_name ) { count++ assert !row.first_anem @@ -335,7 +334,7 @@ class PipelineTest { .fillDownBy { Map row, Map previousRow -> return row.id == previousRow.id } - .addStep("Assert values were filled down") { Map row -> + .addStep("Assert values were filled down") { row -> row.each { String key, Object value -> assertNotNull( "Assert ${key} is filled in with a value", value ) assertTrue("Assert that ${key} is non-empty", !(value as String).isEmpty() ) @@ -353,7 +352,7 @@ class PipelineTest { csv("src/test/resources/titanic.csv") .branch { Pipeline pipeline -> return pipeline.filter([Sex: "female"]) - .addStep("Verify sex was filtered out") { Map row -> + .addStep("Verify sex was filtered out") { row -> assertTrue( row.Sex == "female" ) return row } @@ -364,12 +363,12 @@ class PipelineTest { return row } } - .addStep("Verify branch field is NOT on the outter Pipeline") { Map row -> + .addStep("Verify branch field is NOT on the outer Pipeline") { row -> assert row.branch == null return row } .filter([Sex:"male"]) - .addStep("Verify sex was filtered to male") { Map row -> + .addStep("Verify sex was filtered to male") { row -> assertTrue( row.Sex == "male") return row } @@ -381,7 +380,7 @@ class PipelineTest { void testBranchWithGroupBy() { csv("src/test/resources/titanic.csv") .branch { Pipeline p -> - return p.groupBy("Sex", "Pclass").addStep { Map row -> + return p.groupBy("Sex", "Pclass").addStep { row -> assertNotNull( row["male"] ) assertNotNull( row["male"]["3"] ) assertNotNull( row["male"]["2"] ) @@ -454,7 +453,7 @@ class PipelineTest { return row } .onRejection { Pipeline pipeline -> - pipeline.addStep("Verify rejection") { Map row -> + pipeline.addStep("Verify rejection") { row -> rejections++ return row } @@ -487,7 +486,7 @@ class PipelineTest { @Test void testSort() { String lastHobby - from(GratumFixture.hobbies).sort("hobby").addStep("Assert order is increasing") { Map row -> + from(GratumFixture.hobbies).sort("hobby").addStep("Assert order is increasing") { row -> if( lastHobby ) assertTrue( "Assert ${lastHobby} < ${row.hobby}", lastHobby.compareTo( row.hobby ) <= 0 ) lastHobby = row.hobby return row @@ -513,7 +512,7 @@ class PipelineTest { LoadStatistic stats = from(GratumFixture.hobbies) .addStep("Rejection") { Map row -> row.id > 1 ? row : reject( row,"${row.id} is too small", RejectionCategory.REJECTION) } .onRejection { Pipeline pipeline -> - pipeline.addStep("Save rejections") { Map row -> + pipeline.addStep("Save rejections") { row -> rejections << row return row } @@ -537,7 +536,7 @@ class PipelineTest { expectedCount = json.number message = json.message json.people - }.addStep("assert astros in space") { Map row -> + }.addStep("assert astros in space") { row -> actualCount++ // assert that we received the data we expected, but we can't really test anything because this will change over time assertNotNull( row.name ) @@ -565,7 +564,7 @@ class PipelineTest { [name: 'Kirby', dateOfBirth: new Date()], [name: 'Huck', dateOfBirth: '08/12/1994'] ]).asDate('dateOfBirth', 'yyyy-MM-dd', 'MM/dd/yyyy') - .addStep("Assert all are Dates") { Map row -> + .addStep("Assert all are Dates") { row -> if( row['name'] == 'Kirby' ) { kirby = true } @@ -640,7 +639,7 @@ class PipelineTest { [name: 'Lily', member: '1'] ]) .asBoolean('member') - .addStep("Assert all are boolean true") { Map row -> + .addStep("Assert all are boolean true") { row -> assert row.member instanceof Boolean assert row.member return row @@ -658,7 +657,7 @@ class PipelineTest { [name: 'Lily', member: '0'] ]) .asBoolean('member') - .addStep("Assert all are boolean true") { Map row -> + .addStep("Assert all are boolean true") { row -> assertTrue( row.member instanceof Boolean ) assert row.member != true return row @@ -672,7 +671,7 @@ class PipelineTest { LoadStatistic stats = from( GratumFixture.people ) .filter([gender: 'male']) .sort('age') - .addStep('ageN > age1') {Map row -> + .addStep('ageN > age1') {row -> assert row.age >= last last = (Integer)row.age return row @@ -687,7 +686,7 @@ class PipelineTest { @Test void testHeaderless() { LoadStatistic stats = csv("src/test/resources/headerless.csv", "|", ["Date", "status", "client", "server", "url", "length", "thread", "userAgent", "referer"]) - .addStep("Assert Columns Exist") { Map row-> + .addStep("Assert Columns Exist") { row-> assertNotNull( row.status ) assertNotNull( row.Date ) assertNotNull( row.client ) @@ -707,7 +706,7 @@ class PipelineTest { @Test void testClip() { - LoadStatistic stat = from(GratumFixture.people).clip("name", "gender").addStep("Test resulting rows") { Map row -> + LoadStatistic stat = from(GratumFixture.people).clip("name", "gender").addStep("Test resulting rows") { row -> assert row.size() == 2 assertTrue( row.containsKey("name") ) assertTrue( row.containsKey("gender") ) @@ -721,7 +720,7 @@ class PipelineTest { @Test void testRagged() { LoadStatistic stats = csv("src/test/resources/ragged.csv", ",") - .addStep("Assert Row") { Map row -> + .addStep("Assert Row") { row -> assert row.containsKey("assignment") switch(row.rank) { case "Captain": @@ -750,7 +749,7 @@ class PipelineTest { try { LoadStatistic stat = from(GratumFixture.people) .save(tmp.absolutePath, "|") - .addStep("Verify we have a CSV file on the Pipeline") { Map row -> + .addStep("Verify we have a CSV file on the Pipeline") { row -> assert row.file != null assert row.filename == tmp.absolutePath assert row.stream != null @@ -762,7 +761,7 @@ class PipelineTest { assert stat.rejections == 0 csv(tmp.absolutePath, "|") - .addStep("assert 5 columns") { Map row -> + .addStep("assert 5 columns") { row -> assert row.size() == 5 assert row["comment"].contains("\n") return row @@ -775,7 +774,7 @@ class PipelineTest { @Test public void testEscaping() { LoadStatistic stats = csv("src/test/resources/ragged.csv", ",") - .addStep("Test Escaping") { Map row -> + .addStep("Test Escaping") { row -> switch(row.rank) { case "Captain": assert row.comment.contains("\n") @@ -801,7 +800,7 @@ class PipelineTest { @Test public void testDefaultValues() { LoadStatistic stats = csv("src/test/resources/ragged.csv", ",") - .addStep("Assert null exists") { Map row -> + .addStep("Assert null exists") { row -> if( ['Captain', 'Private First Class'].contains( row.rank ) ) { assert !row.assignment } @@ -810,7 +809,7 @@ class PipelineTest { .defaultValues([ assignment: 'None' ]) - .addStep("Assert defaults") { Map row -> + .addStep("Assert defaults") { row -> assert row.assignment != null switch( row.rank ) { case 'Captain': @@ -835,14 +834,14 @@ class PipelineTest { [name: 'State 2', tax: 5.5, defaultTax: 4.5], [name: 'State 3', tax: null, defaultTax: 3.0 ] ]) - .addStep("Assert null exists") { Map row -> + .addStep("Assert null exists") { row -> if( row.name != 'State 2' ) { assert row.tax == null } return row } .defaultsBy([tax: 'defaultTax']) - .addStep("Assert non null tax") { Map row -> + .addStep("Assert non null tax") { row -> assert row.tax != null assert row.tax instanceof Number switch( row.name ) { @@ -894,7 +893,7 @@ class PipelineTest { .escaping(false) .into() .trim() - .addStep("Test csv without escaping") { Map row -> + .addStep("Test csv without escaping") { row -> switch( line ) { case 1: assert row.ConNameFirst == "martini" @@ -921,7 +920,7 @@ class PipelineTest { .filter(gender: "male") .save("testRejectionsFilterAcrossMultiplePipelines.csv", "|") .onRejection { Pipeline rejections -> - rejections.addStep("Assert We see males") { Map row -> + rejections.addStep("Assert We see males") { row -> rejectionsCalled = true assert row.gender == "female" return row @@ -972,10 +971,10 @@ class PipelineTest { [ name: 'Volume', value: 11.06] ] ] - ]).exchange { Map row -> + ]).exchange { row -> return from((Collection)row.features).setField("product", row.product ) } - .addStep("Check the rows and files") { Map row -> + .addStep("Check the rows and files") { row -> assert row.containsKey("product") assert row.containsKey("name") assert row.containsKey("value") @@ -1010,7 +1009,7 @@ class PipelineTest { void testReplaceAll() { LoadStatistic stat = from([date: '(Tue) 12/12/1999'], [date: '(Thu) 9/17/2001'], [date:'(Wed) 3/1/2022']) .replaceAll("date", ~/\(\w+\)/, "") - .addStep("Test for date") { Map row -> + .addStep("Test for date") { row -> assert ["(Tue)", "(Wed)", "(Thu)"].findAll() {r -> !row.date.contains(r) }.size() == 3 row } @@ -1027,7 +1026,7 @@ class PipelineTest { '2':'red', '3':'green' ]) - .addStep("Assert") { Map row -> + .addStep("Assert") { row -> assert ['blue', 'red', 'green'].contains(row.color_code) return row }