Skip to content

Commit

Permalink
Add PCF2 ShardCombiner stage service (#1429)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #1429

I just following D36325473 (deb4801)

We had a new stage service for PCF2.0 PL, following the instructions here: https://www.internalfb.com/intern/wiki/Private_Computation_Platform_(PCP)/Internal_Developer_Guide/How_to_add_a_stage_to_PCS/Step_2:_Add_a_stage_service/

Reviewed By: robotal

Differential Revision: D38340542

fbshipit-source-id: c4bec85d7b86acded8e9a7c4f808866eb4f36040
  • Loading branch information
tbags authored and facebook-github-bot committed Aug 12, 2022
1 parent 419dce0 commit d40e295
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 0 deletions.
1 change: 1 addition & 0 deletions fbpcs/onedocker_binary_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class OneDockerBinaryNames(Enum):
PCF2_ATTRIBUTION = "private_attribution/pcf2_attribution"
PCF2_AGGREGATION = "private_attribution/pcf2_aggregation"
SHARD_AGGREGATOR = "private_attribution/shard-aggregator"
PCF2_SHARD_COMBINER = "private_attribution/pcf2_shard-combiner"

PID_CLIENT = "pid/private-id-client"
PID_SERVER = "pid/private-id-server"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ def pcf2_aggregation_stage_output_base_path(self) -> str:
def shard_aggregate_stage_output_path(self) -> str:
return self._get_stage_output_path("shard_aggregation_stage", "json")

@property
def pcf2_shard_combine_stage_output_path(self) -> str:
return self._get_stage_output_path("pcf2_shard_combiner_stage", "json")

def _get_stage_output_path(self, stage: str, extension_type: str) -> str:
return os.path.join(
self.product_config.common.output_dir,
Expand Down
15 changes: 15 additions & 0 deletions fbpcs/private_computation/repository/private_computation_game.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class GameNames(Enum):
LIFT = "lift"
PCF2_LIFT = "pcf2_lift"
SHARD_AGGREGATOR = "shard_aggregator"
PCF2_SHARD_COMBINER = "pcf2_shard_combiner"
DECOUPLED_ATTRIBUTION = "decoupled_attribution"
DECOUPLED_AGGREGATION = "decoupled_aggregation"
PCF2_ATTRIBUTION = "pcf2_attribution"
Expand Down Expand Up @@ -75,6 +76,20 @@ class GameNamesValue(TypedDict):
OneDockerArgument(name="visibility", required=False),
],
},
GameNames.PCF2_SHARD_COMBINER.value: {
"onedocker_package_name": OneDockerBinaryNames.PCF2_SHARD_COMBINER.value,
"arguments": [
OneDockerArgument(name="input_base_path", required=True),
OneDockerArgument(name="num_shards", required=True),
OneDockerArgument(name="output_path", required=True),
OneDockerArgument(name="metrics_format_type", required=True),
OneDockerArgument(name="threshold", required=True),
OneDockerArgument(name="first_shard_index", required=False),
OneDockerArgument(name="log_cost", required=False),
OneDockerArgument(name="run_name", required=False),
OneDockerArgument(name="visibility", required=False),
],
},
GameNames.DECOUPLED_ATTRIBUTION.value: {
"onedocker_package_name": OneDockerBinaryNames.DECOUPLED_ATTRIBUTION.value,
"arguments": [
Expand Down
189 changes: 189 additions & 0 deletions fbpcs/private_computation/service/pcf2_shard_combiner_stage_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

# pyre-strict

import logging

from typing import DefaultDict, List, Optional

from fbpcp.service.mpc import MPCService
from fbpcs.common.entity.pcs_mpc_instance import PCSMPCInstance
from fbpcs.onedocker_binary_config import OneDockerBinaryConfig
from fbpcs.onedocker_binary_names import OneDockerBinaryNames
from fbpcs.private_computation.entity.infra_config import PrivateComputationGameType
from fbpcs.private_computation.entity.pcs_feature import PCSFeature
from fbpcs.private_computation.entity.private_computation_instance import (
PrivateComputationInstance,
PrivateComputationInstanceStatus,
)
from fbpcs.private_computation.entity.product_config import (
AttributionConfig,
ResultVisibility,
)
from fbpcs.private_computation.repository.private_computation_game import GameNames
from fbpcs.private_computation.service.constants import DEFAULT_LOG_COST_TO_S3
from fbpcs.private_computation.service.private_computation_stage_service import (
PrivateComputationStageService,
)
from fbpcs.private_computation.service.utils import (
create_and_start_mpc_instance,
get_updated_pc_status_mpc_game,
map_private_computation_role_to_mpc_party,
)


class ShardCombinerStageService(PrivateComputationStageService):
"""Handles business logic for the private computation combine aggregate metrics stage
Private attributes:
_onedocker_binary_config_map: Stores a mapping from mpc game to OneDockerBinaryConfig (binary version and tmp directory)
_mpc_svc: creates and runs MPC instances
_log_cost_to_s3: TODO
_container_timeout: optional duration in seconds before cloud containers timeout
"""

def __init__(
self,
onedocker_binary_config_map: DefaultDict[str, OneDockerBinaryConfig],
mpc_service: MPCService,
log_cost_to_s3: bool = DEFAULT_LOG_COST_TO_S3,
container_timeout: Optional[int] = None,
) -> None:
self._onedocker_binary_config_map = onedocker_binary_config_map
self._mpc_service = mpc_service
self._log_cost_to_s3 = log_cost_to_s3
self._container_timeout = container_timeout

# TODO T88759390: Make this function truly async. It is not because it calls blocking functions.
# Make an async version of run_async() so that it can be called by Thrift
async def run_async(
self,
pc_instance: PrivateComputationInstance,
server_ips: Optional[List[str]] = None,
) -> PrivateComputationInstance:
"""Runs the private computation combine aggregate metrics stage
Args:
pc_instance: the private computation instance to run aggregate metrics with
server_ips: only used by the partner role. These are the ip addresses of the publisher's containers.
Returns:
An updated version of pc_instance that stores an MPCInstance
"""

num_shards = (
pc_instance.infra_config.num_mpc_containers
* pc_instance.infra_config.num_files_per_mpc_container
)

# TODO T101225989: map aggregation_type from the compute stage to metrics_format_type
metrics_format_type = (
"lift"
if pc_instance.infra_config.game_type is PrivateComputationGameType.LIFT
else "ad_object"
)

binary_name = OneDockerBinaryNames.PCF2_SHARD_COMBINER.value
binary_config = self._onedocker_binary_config_map[binary_name]

# Get output path of previous stage depending on what stage flow we are using
# Using "PrivateComputationDecoupledStageFlow" instead of PrivateComputationDecoupledStageFlow.get_cls_name() to avoid
# circular import error.
if pc_instance.get_flow_cls_name in [
"PrivateComputationDecoupledStageFlow",
"PrivateComputationDecoupledLocalTestStageFlow",
]:
input_stage_path = pc_instance.decoupled_aggregation_stage_output_base_path
elif pc_instance.get_flow_cls_name in [
"PrivateComputationPCF2StageFlow",
"PrivateComputationPCF2LocalTestStageFlow",
"PrivateComputationPIDPATestStageFlow",
]:
input_stage_path = pc_instance.pcf2_aggregation_stage_output_base_path
elif pc_instance.get_flow_cls_name in [
"PrivateComputationPCF2LiftStageFlow",
"PrivateComputationPCF2LiftLocalTestStageFlow",
]:
input_stage_path = pc_instance.pcf2_lift_stage_output_base_path
else:
if pc_instance.has_feature(PCSFeature.PRIVATE_LIFT_PCF2_RELEASE):
input_stage_path = pc_instance.pcf2_lift_stage_output_base_path
else:
input_stage_path = pc_instance.compute_stage_output_base_path

if self._log_cost_to_s3:
run_name = pc_instance.infra_config.instance_id

if pc_instance.product_config.common.post_processing_data:
pc_instance.product_config.common.post_processing_data.s3_cost_export_output_paths.add(
f"sa-logs/{run_name}_{pc_instance.infra_config.role.value.title()}.json",
)
else:
run_name = ""

# Create and start MPC instance
game_args = [
{
"input_base_path": input_stage_path,
"metrics_format_type": metrics_format_type,
"num_shards": num_shards,
"output_path": pc_instance.pcf2_shard_combine_stage_output_path,
"threshold": 0
if isinstance(pc_instance.product_config, AttributionConfig)
# pyre-ignore Undefined attribute [16]
else pc_instance.product_config.k_anonymity_threshold,
"run_name": run_name,
"log_cost": self._log_cost_to_s3,
},
]
# We should only export visibility to scribe when it's set
if (
pc_instance.product_config.common.result_visibility
is not ResultVisibility.PUBLIC
):
result_visibility = int(pc_instance.product_config.common.result_visibility)
for arg in game_args:
arg["visibility"] = result_visibility

mpc_instance = await create_and_start_mpc_instance(
mpc_svc=self._mpc_service,
instance_id=pc_instance.infra_config.instance_id
+ "_combine_shards"
+ str(pc_instance.infra_config.retry_counter),
game_name=GameNames.PCF2_SHARD_COMBINER.value,
mpc_party=map_private_computation_role_to_mpc_party(
pc_instance.infra_config.role
),
num_containers=1,
binary_version=binary_config.binary_version,
server_ips=server_ips,
game_args=game_args,
container_timeout=self._container_timeout,
repository_path=binary_config.repository_path,
)

logging.info("MPC instance started running for PCF2.0 Shard Combiner.")

# Push MPC instance to PrivateComputationInstance.instances and update PL Instance status
pc_instance.infra_config.instances.append(
PCSMPCInstance.from_mpc_instance(mpc_instance)
)
return pc_instance

def get_status(
self,
pc_instance: PrivateComputationInstance,
) -> PrivateComputationInstanceStatus:
"""Updates the MPCInstances and gets latest PrivateComputationInstance status
Arguments:
private_computation_instance: The PC instance that is being updated
Returns:
The latest status for private_computation_instance
"""
return get_updated_pc_status_mpc_game(pc_instance, self._mpc_service)
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ class PrivateComputationServiceData:
service=None,
)

PCF2_SHARD_COMBINE_STAGE_DATA: StageData = StageData(
binary_name=OneDockerBinaryNames.PCF2_SHARD_COMBINER.value,
game_name=BINARY_NAME_TO_GAME_NAME[
OneDockerBinaryNames.PCF2_SHARD_COMBINER.value
],
service=None,
)

@classmethod
def get(
cls, game_type: PrivateComputationGameType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from collections import defaultdict
from unittest import IsolatedAsyncioTestCase
from unittest.mock import AsyncMock, MagicMock, patch

from fbpcp.entity.mpc_instance import MPCParty
from fbpcs.common.entity.pcs_mpc_instance import PCSMPCInstance
from fbpcs.onedocker_binary_config import OneDockerBinaryConfig
from fbpcs.private_computation.entity.infra_config import (
InfraConfig,
PrivateComputationGameType,
)
from fbpcs.private_computation.entity.private_computation_instance import (
PrivateComputationInstance,
PrivateComputationInstanceStatus,
PrivateComputationRole,
)
from fbpcs.private_computation.entity.product_config import (
AttributionConfig,
CommonProductConfig,
LiftConfig,
ProductConfig,
)
from fbpcs.private_computation.repository.private_computation_game import GameNames
from fbpcs.private_computation.service.constants import NUM_NEW_SHARDS_PER_FILE
from fbpcs.private_computation.service.pcf2_shard_combiner_stage_service import (
ShardCombinerStageService,
)


class TestShardCombinerStageService(IsolatedAsyncioTestCase):
@patch("fbpcp.service.mpc.MPCService")
def setUp(self, mock_mpc_svc) -> None:
self.mock_mpc_svc = mock_mpc_svc
self.mock_mpc_svc.create_instance = MagicMock()

onedocker_binary_config_map = defaultdict(
lambda: OneDockerBinaryConfig(
tmp_directory="/test_tmp_directory/",
binary_version="latest",
repository_path="test_path/",
)
)
self.stage_svc = ShardCombinerStageService(
onedocker_binary_config_map, self.mock_mpc_svc
)

async def test_shard_combiner(self) -> None:
private_computation_instance = self._create_pc_instance()
mpc_instance = PCSMPCInstance.create_instance(
instance_id=private_computation_instance.infra_config.instance_id
+ "_aggregate_metrics0",
game_name=GameNames.LIFT.value,
mpc_party=MPCParty.CLIENT,
num_workers=private_computation_instance.infra_config.num_mpc_containers,
)

self.mock_mpc_svc.start_instance_async = AsyncMock(return_value=mpc_instance)

test_server_ips = [
f"192.0.2.{i}"
for i in range(private_computation_instance.infra_config.num_mpc_containers)
]
await self.stage_svc.run_async(private_computation_instance, test_server_ips)
test_game_args = [
{
"input_base_path": private_computation_instance.compute_stage_output_base_path,
"metrics_format_type": "lift",
"num_shards": private_computation_instance.infra_config.num_mpc_containers
* NUM_NEW_SHARDS_PER_FILE,
"output_path": private_computation_instance.pcf2_shard_combine_stage_output_path,
"threshold": 0
if isinstance(
private_computation_instance.product_config, AttributionConfig
)
# pyre-ignore Undefined attribute [16]
else private_computation_instance.product_config.k_anonymity_threshold,
"run_name": private_computation_instance.infra_config.instance_id
if self.stage_svc._log_cost_to_s3
else "",
"log_cost": True,
}
]

self.assertEqual(
GameNames.PCF2_SHARD_COMBINER.value,
self.mock_mpc_svc.create_instance.call_args[1]["game_name"],
)
self.assertEqual(
test_game_args,
self.mock_mpc_svc.create_instance.call_args[1]["game_args"],
)

self.assertEqual(
mpc_instance, private_computation_instance.infra_config.instances[0]
)

def _create_pc_instance(self) -> PrivateComputationInstance:
infra_config: InfraConfig = InfraConfig(
instance_id="test_instance_123",
role=PrivateComputationRole.PARTNER,
status=PrivateComputationInstanceStatus.COMPUTATION_COMPLETED,
status_update_ts=1600000000,
instances=[],
game_type=PrivateComputationGameType.LIFT,
num_pid_containers=2,
num_mpc_containers=2,
num_files_per_mpc_container=NUM_NEW_SHARDS_PER_FILE,
status_updates=[],
)
common: CommonProductConfig = CommonProductConfig(
input_path="456",
output_dir="789",
)
product_config: ProductConfig = LiftConfig(
common=common,
)
return PrivateComputationInstance(
infra_config=infra_config,
product_config=product_config,
)

0 comments on commit d40e295

Please sign in to comment.