Skip to content

Commit

Permalink
Merge pull request #1 from apache/master
Browse files Browse the repository at this point in the history
update fork
  • Loading branch information
amoght authored Nov 26, 2019
2 parents 0bd4451 + b446304 commit 0de962c
Show file tree
Hide file tree
Showing 151 changed files with 4,883 additions and 1,509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def find_release_time_from_python_compatibility_checking_service(dep_name, versi

def request_session_with_retries():
"""
Create a http session with retries
Create an http session with retries
"""
session = requests.Session()
retries = Retry(total=3)
Expand Down
38 changes: 38 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -294,3 +294,41 @@ release {
pushToRemote = ''
}
}

// Reports linkage errors across multiple Apache Beam artifact ids.
//
// To use (from the root of project):
// ./gradlew -Ppublishing -PjavaLinkageArtifactIds=artifactId1,artifactId2,... :checkJavaLinkage
//
// For example:
// ./gradlew -Ppublishing -PjavaLinkageArtifactIds=beam-sdks-java-core,beam-sdks-java-io-jdbc :checkJavaLinkage
//
// Note that this task publishes artifacts into your local Maven repository.
if (project.hasProperty('javaLinkageArtifactIds')) {
if (!project.hasProperty('publishing')) {
throw new GradleException('You can only check linkage of Java artifacts if you specify -Ppublishing on the command line as well.')
}

configurations { linkageCheckerJava }
dependencies {
linkageCheckerJava "com.google.cloud.tools:dependencies:1.0.1"
}

// We need to evaluate all the projects first so that we can find depend on all the
// publishMavenJavaPublicationToMavenLocal tasks below.
for (p in rootProject.subprojects) {
if (!p.path.equals(project.path)) {
evaluationDependsOn(p.path)
}
}

project.task('checkJavaLinkage', type: JavaExec) {
dependsOn project.getTasksByName('publishMavenJavaPublicationToMavenLocal', true /* recursively */)
classpath = project.configurations.linkageCheckerJava
main = 'com.google.cloud.tools.opensource.classpath.LinkageCheckerMain'
args '-a', project.javaLinkageArtifactIds.split(',').collect({"${project.ext.mavenGroupId}:${it}:${project.version}"}).join(',')
doLast {
println "NOTE: This task published artifacts into your local Maven repository. You may want to remove them manually."
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class BeamModulePlugin implements Plugin<Project> {
aws_java_sdk2_dynamodb : "software.amazon.awssdk:dynamodb:$aws_java_sdk2_version",
aws_java_sdk2_sdk_core : "software.amazon.awssdk:sdk-core:$aws_java_sdk2_version",
aws_java_sdk2_sns : "software.amazon.awssdk:sns:$aws_java_sdk2_version",
aws_java_sdk2_sqs : "software.amazon.awssdk:sqs:$aws_java_sdk2_version",
bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
cassandra_driver_core : "com.datastax.cassandra:cassandra-driver-core:$cassandra_driver_version",
Expand Down Expand Up @@ -1924,7 +1925,6 @@ class BeamModulePlugin implements Plugin<Project> {
"--input=/etc/profile",
"--output=/tmp/py-wordcount-direct",
"--runner=${runner}",
"--experiments=worker_threads=100",
"--parallelism=2",
"--shutdown_sources_on_final_watermark",
"--sdk_worker_parallelism=1",
Expand Down
4 changes: 2 additions & 2 deletions model/fn-execution/src/main/proto/beam_fn_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ service BeamFnControl {
// matching instruction id.
// Stable
message InstructionRequest {
// (Required) An unique identifier provided by the runner which represents
// (Required) A unique identifier provided by the runner which represents
// this requests execution. The InstructionResponse MUST have the matching id.
string instruction_id = 1;

Expand Down Expand Up @@ -564,7 +564,7 @@ service BeamFnData {
*/

message StateRequest {
// (Required) An unique identifier provided by the SDK which represents this
// (Required) A unique identifier provided by the SDK which represents this
// requests execution. The StateResponse corresponding with this request
// will have the matching id.
string id = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public CounterCell(MetricName name) {
this.name = name;
}

@Override
public void reset() {
dirty.afterModification();
value.set(0L);
}

/**
* Increment the counter by the given amount.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public DistributionCell(MetricName name) {
this.name = name;
}

@Override
public void reset() {
dirty.afterModification();
value.set(DistributionData.EMPTY);
}

/** Increment the distribution by the given amount. */
@Override
public void update(long n) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public static ExecutionStateSampler newForTest(MillisProvider clock) {

@Nullable private Future<Void> executionSamplerFuture = null;

/** Reset the state sampler. */
public void reset() {
lastSampleTimeMillis = 0;
}

/**
* Called to start the ExecutionStateSampler. Until the returned {@link Closeable} is closed, the
* state sampler will periodically sample the current state of all the threads it has been asked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,17 @@ public ExecutionStateTracker(ExecutionStateSampler sampler) {
this.sampler = sampler;
}

/** Reset the execution status. */
public void reset() {
trackedThread = null;
currentState = null;
numTransitions = 0;
millisSinceLastTransition = 0;
transitionsAtLastSample = 0;
nextLullReportMs = LULL_REPORT_MS;
CURRENT_TRACKERS.entrySet().removeIf(entry -> entry.getValue() == this);
}

@VisibleForTesting
public static ExecutionStateTracker newForTest() {
return new ExecutionStateTracker(ExecutionStateSampler.newForTest());
Expand Down Expand Up @@ -261,6 +272,16 @@ public long getMillisSinceLastTransition() {
return millisSinceLastTransition;
}

/** Return the number of transitions since the last sample. */
public long getTransitionsAtLastSample() {
return transitionsAtLastSample;
}

/** Return the time of the next lull report. */
public long getNextLullReportMs() {
return nextLullReportMs;
}

protected void takeSample(long millisSinceLastSample) {
// These variables are read by Sampler thread, and written by Execution and Progress Reporting
// threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public GaugeCell(MetricName name) {
this.name = name;
}

@Override
public void reset() {
dirty.afterModification();
gaugeValue.set(GaugeData.empty());
}

/** Set the gauge to the given value. */
@Override
public void set(long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ public interface MetricCell<DataT> extends Serializable {

/** Return the cumulative value of this metric. */
DataT getCumulative();

/** Reset this metric. */
void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ public MetricsContainerImpl(@Nullable String stepName) {
this.stepName = stepName;
}

/** Reset the metrics. */
public void reset() {
reset(counters);
reset(distributions);
reset(gauges);
}

private void reset(MetricsMap<MetricName, ? extends MetricCell<?>> cells) {
for (MetricCell<?> cell : cells.values()) {
cell.reset();
}
}

/**
* Return a {@code CounterCell} named {@code metricName}. If it doesn't exist, create a {@code
* Metric} with the specified name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public void update(String step, MetricsContainerImpl container) {
getContainer(step).update(container);
}

/** Reset the metric containers. */
public void reset() {
for (MetricsContainerImpl metricsContainer : metricsContainers.values()) {
metricsContainer.reset();
}
unboundContainer.reset();
}

@Override
public boolean equals(Object object) {
if (object instanceof MetricsContainerStepMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public SimpleExecutionState(
}
}

/** Reset the totalMillis spent in the state. */
public void reset() {
this.totalMillis = 0;
}

public String getUrn() {
return this.urn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ public void register(SimpleExecutionState state) {
this.executionStates.add(state);
}

/** Reset the registered SimpleExecutionStates. */
public void reset() {
for (SimpleExecutionState state : executionStates) {
state.reset();
}
}

/** @return Execution Time MonitoringInfos based on the tracked start or finish function. */
public List<MonitoringInfo> getExecutionTimeMonitoringInfos() {
List<MonitoringInfo> monitoringInfos = new ArrayList<MonitoringInfo>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ public void testNotEquals() {
Assert.assertNotEquals(counterCell, differentName);
Assert.assertNotEquals(counterCell.hashCode(), differentName.hashCode());
}

@Test
public void testReset() {
CounterCell counterCell = new CounterCell(MetricName.named("namespace", "name"));
counterCell.inc(1);
assertThat(counterCell.getCumulative(), equalTo(1L));

counterCell.reset();
assertThat(counterCell.getCumulative(), equalTo(0L));
assertThat(counterCell.getDirty(), equalTo(new DirtyState()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ public void testNotEquals() {
Assert.assertNotEquals(distributionCell, differentName);
Assert.assertNotEquals(distributionCell.hashCode(), differentName.hashCode());
}

@Test
public void testReset() {
DistributionCell distributionCell = new DistributionCell(MetricName.named("namespace", "name"));
distributionCell.update(2);
assertThat(distributionCell.getCumulative(), equalTo(DistributionData.create(2, 1, 2, 2)));

distributionCell.reset();
assertThat(distributionCell.getCumulative(), equalTo(DistributionData.EMPTY));
assertThat(distributionCell.getDirty(), equalTo(new DirtyState()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ public void testLullDetectionOccurs() throws Exception {
assertThat(step1act1.lullReported, equalTo(true));
}

@Test
public void testReset() throws Exception {
sampler.lastSampleTimeMillis = 100L;
sampler.reset();
assertThat(sampler.lastSampleTimeMillis, equalTo(0L));
}

private ExecutionStateTracker createTracker() {
return new ExecutionStateTracker(sampler);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.metrics;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;

import java.io.Closeable;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState;
import org.joda.time.DateTimeUtils.MillisProvider;
import org.junit.Before;
import org.junit.Test;

/** Tests for {@link ExecutionStateTracker}. */
public class ExecutionStateTrackerTest {

private MillisProvider clock;
private ExecutionStateSampler sampler;

@Before
public void setUp() {
clock = mock(MillisProvider.class);
sampler = ExecutionStateSampler.newForTest(clock);
}

private static class TestExecutionState extends ExecutionState {

private long totalMillis = 0;

public TestExecutionState(String stateName) {
super(stateName);
}

@Override
public void takeSample(long millisSinceLastSample) {
totalMillis += millisSinceLastSample;
}

@Override
public void reportLull(Thread trackedThread, long millis) {}
}

private final TestExecutionState testExecutionState = new TestExecutionState("activity");

@Test
public void testReset() throws Exception {
ExecutionStateTracker tracker = createTracker();
try (Closeable c1 = tracker.activate(new Thread())) {
try (Closeable c2 = tracker.enterState(testExecutionState)) {
sampler.doSampling(400);
assertThat(testExecutionState.totalMillis, equalTo(400L));
}
}

tracker.reset();
assertThat(tracker.getTrackedThread(), equalTo(null));
assertThat(tracker.getCurrentState(), equalTo(null));
assertThat(tracker.getNumTransitions(), equalTo(0L));
assertThat(tracker.getMillisSinceLastTransition(), equalTo(0L));
assertThat(tracker.getTransitionsAtLastSample(), equalTo(0L));
assertThat(tracker.getNextLullReportMs(), equalTo(TimeUnit.MINUTES.toMillis(5)));
}

private ExecutionStateTracker createTracker() {
return new ExecutionStateTracker(sampler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,15 @@ public void testNotEquals() {
Assert.assertNotEquals(gaugeCell, differentName);
Assert.assertNotEquals(gaugeCell.hashCode(), differentName.hashCode());
}

@Test
public void testReset() {
GaugeCell gaugeCell = new GaugeCell(MetricName.named("namespace", "name"));
gaugeCell.set(2);
assertThat(gaugeCell.getCumulative().value(), equalTo(GaugeData.create(2).value()));

gaugeCell.reset();
assertThat(gaugeCell.getCumulative(), equalTo(GaugeData.empty()));
assertThat(gaugeCell.getDirty(), equalTo(new DirtyState()));
}
}
Loading

0 comments on commit 0de962c

Please sign in to comment.