Skip to content

Commit

Permalink
Release 1.1.8 (#11)
Browse files Browse the repository at this point in the history
* Add annotations for closure params and delegates to more methods, so we can more easily write code with static typing.

* Adding more type annotations to help understand the type safety of closures.  Had to remove the @CompileStatic on LocalConcurrentContext because of a compiler bug.  Tried moving to 3.0.22, but it had compiler errors that I couldn't figure out.

* Bump version to 1.1.8

* Bump groovy version to 2.5.23

---------

Co-authored-by: charlie <[email protected]>
  • Loading branch information
chubbard and charlie authored Oct 30, 2024
1 parent 3ac5e01 commit 5d703ab
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 135 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ gratum with a couple of beliefs about data transformations.

For Gradle:

compile group: 'com.github.chubbard', name: 'gratum', version: '1.1.7'
compile group: 'com.github.chubbard', name: 'gratum', version: '1.1.8'

For Maven:

<dependency>
<groupId>com.github.chubbard</groupId>
<artifactId>gratum</artifactId>
<version>1.1.7</version>
<version>1.1.8</version>
</dependency>
## Oh Shell Yeah!
Expand Down
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ targetCompatibility = 1.8
sourceCompatibility = 1.8

group = 'com.github.chubbard'
version = '1.1.7'
//version = '1.1.6-SNAPSHOT'
//version = '1.1.7'
version = '1.1.8'
description = """
A simplified standalone ETL engine for groovy. Gratum is groovy + datum.
"""
Expand All @@ -24,7 +24,7 @@ dependencies {
implementation group: 'commons-codec', name: 'commons-codec', version: '1.15'
implementation group: 'commons-cli', name: 'commons-cli', version: '1.4'

implementation('org.codehaus.groovy:groovy-all:2.5.18') {
implementation('org.codehaus.groovy:groovy-all:2.5.23') {
exclude group: 'org.codehaus.groovy', module:'groovy-swing'
exclude group: 'org.codehaus.groovy', module:'groovy-testng'
exclude group: 'org.codehaus.groovy', module:'groovy-console'
Expand Down
51 changes: 29 additions & 22 deletions src/main/groovy/gratum/concurrency/LocalConcurrentContext.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ import gratum.etl.Pipeline
import gratum.etl.Rejection
import gratum.etl.RejectionCategory
import gratum.source.ChainedSource
import gratum.source.ClosureSource
import groovy.transform.CompileStatic
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

@CompileStatic
//@CompileStatic
public class LocalConcurrentContext implements ConcurrentContext {

final int workerSize
Expand All @@ -33,13 +34,19 @@ public class LocalConcurrentContext implements ConcurrentContext {
latch = new CountDownLatch(workers+1)
}

public LocalConcurrentContext spread(@DelegatesTo(LocalConcurrentContext) Closure<Pipeline> workerClosure ) {
public LocalConcurrentContext spread(
@DelegatesTo(LocalConcurrentContext)
@ClosureParams(value = FromString, options = ["gratum.etl.Pipeline"])
Closure<Pipeline> workerClosure ) {
this.workerClosure = workerClosure
this.workerClosure.delegate = this
return this
}

public LocalConcurrentContext collect(@DelegatesTo(LocalConcurrentContext) Closure<Pipeline> resultsClosure ) {
public LocalConcurrentContext collect(
@DelegatesTo(LocalConcurrentContext)
@ClosureParams(value = FromString, options = ["gratum.etl.Pipeline"])
Closure<Pipeline> resultsClosure ) {
this.resultProcessorClosure = resultsClosure
this.resultProcessorClosure.delegate = this
return this
Expand All @@ -49,7 +56,7 @@ public class LocalConcurrentContext implements ConcurrentContext {
return { Pipeline pipeline ->
createWorkers()
createResultProcessor()
pipeline.addStep("Queue to Workers") { Map<String,Object> row ->
pipeline.addStep("Queue to Workers") { row ->
eventQueue.put( row )
return row
}
Expand Down Expand Up @@ -85,7 +92,7 @@ public class LocalConcurrentContext implements ConcurrentContext {
for( int i = 0; i < workerSize; i++ ) {
workers << new PipelineWorker("Worker-${i+1}", {
try {
Pipeline pipeline = ClosureSource.of({ Pipeline pipeline ->
Pipeline pipeline = Pipeline.create("Worker") { pipeline ->
boolean done = false
while (!done && !Thread.interrupted()) {
Map<String, Object> row = eventQueue.poll()
Expand All @@ -96,21 +103,21 @@ public class LocalConcurrentContext implements ConcurrentContext {
pipeline.process(row)
}
}
}).name("Worker").into()
.onRejection { Pipeline rej ->
rej.addStep { Map<String, Object> row ->
// so when we play this down the rejections pipeline it'll expect a REJECT_KEY to be there so we recreate it
// because at this point the REJECTED_KEY property has been removed so we re-add it. Not great.
Rejection reject = new Rejection(row.rejectionReason as String, row.rejectionCategory as RejectionCategory, row.rejectionStep as String)
row[Pipeline.REJECTED_KEY] = reject
resultQueue.put(row)
return row
}
return
}
}
pipeline.onRejection { rej ->
rej.addStep("replaceRejectKey") { row ->
// so when we play this down the rejections pipeline it'll expect a REJECT_KEY to be there so we recreate it
// because at this point the REJECTED_KEY property has been removed so we re-add it. Not great.
Rejection reject = new Rejection(row["rejectionReason"] as String, row["rejectionCategory"] as RejectionCategory, row["rejectionStep"] as String)
row[Pipeline.REJECTED_KEY] = reject
resultQueue.put((Map<String,Object>)row)
return row
}
return
}

LoadStatistic stat = ((Pipeline) workerClosure(pipeline))
.addStep("Queue to Results") { Map row ->
LoadStatistic stat = workerClosure.call(pipeline)
.addStep("Queue to Results") { row ->
resultQueue.put(row)
return row
}
Expand All @@ -127,7 +134,7 @@ public class LocalConcurrentContext implements ConcurrentContext {
void createResultProcessor() {
resultProcessor = new PipelineWorker( "Results Processor", {
try {
Pipeline pipeline = ClosureSource.of({ Pipeline pipeline ->
Pipeline pipeline = Pipeline.create("Result Processor") { pipeline ->
boolean done = false
while (!done && !Thread.interrupted()) {
Map<String, Object> row = resultQueue.poll(10, TimeUnit.SECONDS)
Expand All @@ -141,7 +148,7 @@ public class LocalConcurrentContext implements ConcurrentContext {
done = true
}
}
}).name("Result Processor").into()
}
LoadStatistic stats = resultProcessorClosure.call(pipeline)
.go()
return stats
Expand Down
Loading

0 comments on commit 5d703ab

Please sign in to comment.