Skip to content

Commit

Permalink
[FLINK-29479][python] Supports whether uses system env for python.
Browse files Browse the repository at this point in the history
This closes apache#21110.
  • Loading branch information
liuyongvs authored and HuangXingBo committed Oct 25, 2022
1 parent 0e61285 commit 8e16cc8
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/python_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,11 @@
<td>Integer</td>
<td>The maximum number of states cached in a Python UDF worker. Note that this is an experimental flag and might not be available in future releases.</td>
</tr>
<tr>
<td><h5>python.systemenv.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Specify whether to load System Environment when starting Python worker.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ public class PythonOptions {
+ "The interval between each profiling is determined by the config options "
+ "python.fn-execution.bundle.size and python.fn-execution.bundle.time.");

/** The configuration to enable or disable system env for Python execution. */
public static final ConfigOption<Boolean> PYTHON_SYSTEMENV_ENABLED =
ConfigOptions.key("python.systemenv.enabled")
.booleanType()
.defaultValue(true)
.withDescription(
"Specify whether to load System Environment when starting Python worker.");

/** The configuration to enable or disable python operator chaining. */
public static final ConfigOption<Boolean> PYTHON_OPERATOR_CHAINING_ENABLED =
ConfigOptions.key("python.operator-chaining.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_SIZE;
import static org.apache.flink.python.PythonOptions.MAX_BUNDLE_TIME_MILLS;
import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
import static org.apache.flink.python.PythonOptions.PYTHON_SYSTEMENV_ENABLED;
import static org.apache.flink.streaming.api.utils.ClassLeakCleaner.cleanUpLeakingClasses;
import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;

Expand All @@ -51,6 +52,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> extends AbstractStream

protected final Configuration config;

protected transient boolean systemEnvEnabled;

/** Max number of elements to include in a bundle. */
protected transient int maxBundleSize;

Expand All @@ -77,6 +80,7 @@ public AbstractPythonFunctionOperator(Configuration config) {
@Override
public void open() throws Exception {
try {
this.systemEnvEnabled = config.get(PYTHON_SYSTEMENV_ENABLED);
this.maxBundleSize = config.get(MAX_BUNDLE_SIZE);
if (this.maxBundleSize <= 0) {
this.maxBundleSize = MAX_BUNDLE_SIZE.defaultValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected EmbeddedPythonEnvironmentManager createPythonEnvironmentManager() {
return new EmbeddedPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
new HashMap<>(System.getenv()),
systemEnvEnabled ? new HashMap<>(System.getenv()) : new HashMap<>(),
getRuntimeContext().getJobId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected ProcessPythonEnvironmentManager createPythonEnvironmentManager() {
return new ProcessPythonEnvironmentManager(
dependencyInfo,
getContainingTask().getEnvironment().getTaskManagerInfo().getTmpDirectories(),
new HashMap<>(System.getenv()),
systemEnvEnabled ? new HashMap<>(System.getenv()) : new HashMap<>(),
getRuntimeContext().getJobId());
} else {
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,20 @@ void testPythonClientExecutable() {
configuration.get(PythonOptions.PYTHON_CLIENT_EXECUTABLE);
assertThat(actualPythonClientExecutable).isEqualTo(expectedPythonClientExecutable);
}

@Test
void testPythonSystemEnvEnabled() {
final Configuration configuration = new Configuration();
final boolean isSystemEnvEnabled =
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
assertThat(isSystemEnvEnabled)
.isEqualTo(PythonOptions.PYTHON_SYSTEMENV_ENABLED.defaultValue());

final boolean expectedIsSystemEnvEnabled = false;
configuration.setBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED, false);

final boolean actualIsSystemEnvEnabled =
configuration.getBoolean(PythonOptions.PYTHON_SYSTEMENV_ENABLED);
assertThat(actualIsSystemEnvEnabled).isEqualTo(expectedIsSystemEnvEnabled);
}
}

0 comments on commit 8e16cc8

Please sign in to comment.