From 010661d2c0e4a133e88db2b60c4d8d4c3941cf59 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Fri, 10 Jan 2025 21:12:54 +0530 Subject: [PATCH 01/10] feat(secure aggregation): adds utility functions --- openfl/utilities/secagg/__init__.py | 14 +++ openfl/utilities/secagg/crypto.py | 146 ++++++++++++++++++++++++++++ openfl/utilities/secagg/key.py | 84 ++++++++++++++++ openfl/utilities/secagg/shamir.py | 94 ++++++++++++++++++ 4 files changed, 338 insertions(+) create mode 100644 openfl/utilities/secagg/__init__.py create mode 100644 openfl/utilities/secagg/crypto.py create mode 100644 openfl/utilities/secagg/key.py create mode 100644 openfl/utilities/secagg/shamir.py diff --git a/openfl/utilities/secagg/__init__.py b/openfl/utilities/secagg/__init__.py new file mode 100644 index 0000000000..a2265ac969 --- /dev/null +++ b/openfl/utilities/secagg/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +from openfl.utilities.secagg.crypto import ( + create_ciphertext, + decipher_ciphertext, + pseudo_random_generator, +) +from openfl.utilities.secagg.key import generate_agreed_key, generate_key_pair +from openfl.utilities.secagg.shamir import ( + create_secret_shares, + reconstruct_secret +) diff --git a/openfl/utilities/secagg/crypto.py b/openfl/utilities/secagg/crypto.py new file mode 100644 index 0000000000..79db60e6b1 --- /dev/null +++ b/openfl/utilities/secagg/crypto.py @@ -0,0 +1,146 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +""" +This file contains utility functions for Secure Aggregation's cipher related +operations. +""" + +from typing import Tuple, Union + +import numpy as np +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad + + +def create_ciphertext( + secret_key: bytes, + source_id: int, + destination_id: int, + seed_share: bytes, + key_share: bytes, + nonce: bytes = b"nonce", +) -> tuple[bytes, bytes, bytes]: + """ + Creates a cipher-text using a cipher_key for collaborators source_id and + destination_id, and share of the private seed and share of the private key. + + The function creates a byte string using the args such that + data = b'source_id.destination_id.seed_share.key_share'. + The "." serves as a separator such that all values used to create the + ciphertext can be easily distinguished when decrypting. + + Args: + secret_key (bytes): Agreed key in bytes used to construct a cipher for + the encryption. + source_id (int): Unique integer ID of the creating collaborator of the + cipher text. + destination_id (int): Unique integer ID of the recepient collaborator + of the cipher text. + seed_share (bytes): Share of source_id collaborator's private seed for + destination_id collaborator. + key_share (bytes): Share of source_id collaborator's private key for + destination_id collaborator. + + Returns: + bytes: Ciphertext created using the args. + bytes: MAC tag for the ciphertext which can be used for verification. + bytes: Nonce used for generating the cipher used for decryption. + """ + # Converting the integer collaborator IDs to bytes. + source_id_bytes = source_id.to_bytes(4, byteorder="big") + destination_id_bytes = destination_id.to_bytes(4, byteorder="big") + # Generate the byte string to be encrypted. + data = ( + source_id_bytes + b"." + destination_id_bytes + b"." + + seed_share + b"." + key_share + ) + # AES cipher requires the secret key to be of a certain length. + # We use 64 bytes as it is the maximum length available. + padded_secret_key = pad(secret_key, 64) + + from Crypto.Random import get_random_bytes + + # Generate a random nonce to make the encryption non-deterministic. + nonce = get_random_bytes(len(padded_secret_key) / 2) + # Generate a ciphertext using symmetric block cipher. + cipher = AES.new(padded_secret_key, AES.MODE_SIV, nonce=nonce) + ciphertext, mac = cipher.encrypt_and_digest(data) + + return ciphertext, mac, nonce + + +def decipher_ciphertext( + secret_key: bytes, ciphertext: bytes, mac: bytes, nonce: bytes +) -> tuple[int, int, bytes, bytes]: + """ + Decrypt a cipher-text to get the values used to create it. + + The function uses the nonce used while creation of the ciphertext to + create a cipher. This cipher is used to decypt the ciphertext and verify + it using the MAC tag, which was also generated during creation of the + ciphertext. + + Args: + secret_key (bytes): Agreed key in bytes used to construct a cipher for + the encryption. + ciphertext (bytes): Ciphertext to be decrypted. + mac (bytes): MAC tag for the ciphertext which is used for verification. + nonce (bytes): Nonce used during cipher generation used for decryption. + + Returns: + int: Unique integer ID of the creating collaborator of the ciphertext. + int: Unique integer ID of the recepient collaborator of the ciphertext. + bytes: Share of source_id collaborator's private seed for + destination_id collaborator. + bytes: Share of source_id collaborator's private key for + destination_id collaborator. + """ + # Recreate the secret key used for encryption by adding the extra padding. + padded_secret_key = pad(secret_key, 64) + # Generate a ciphertext using symmetric block cipher. + cipher = AES.new(padded_secret_key, AES.MODE_SIV, nonce=nonce) + + data = cipher.decrypt_and_verify(ciphertext, mac) + # Remove the separator "." from the decrypted data. + # data = b'source_id.destination_id.seed_share.key_share' + data = data.split(b".") + + return ( + # Convert the collaborator IDs to int. + int.from_bytes(data[0], "big"), + int.from_bytes(data[1], "big"), + data[2], + data[3], + ) + + +def pseudo_random_generator( + seed: Union[int, float, bytes], + shape: Tuple, +) -> np.ndarray: + """ + Generates a random mask using a seed value passed as arg. + + Args: + seed (Union[int, float, bytes]): Seed to be used for generating a + pseudo-random number. + shape (Tuple): Shape of the numpy array to be generated. + + Returns: + np.ndarray: array with pseudo-randomly generated numbers. + """ + if isinstance(seed, bytes): + # If the seed is a byte string, generate a pseduo-random number using + # it as seed and use that as seed for the numpy pseudo random + # generator. + import random + + random.seed(seed) + seed = random.random() + + # Seed numpy random generator. + rng = np.random.default_rng(seed=seed) + + return rng.random(shape) diff --git a/openfl/utilities/secagg/key.py b/openfl/utilities/secagg/key.py new file mode 100644 index 0000000000..d8c8d9cef4 --- /dev/null +++ b/openfl/utilities/secagg/key.py @@ -0,0 +1,84 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +""" +This file contains utility functions for Secure Aggregation for key operations. +""" + +from Crypto.PublicKey import ECC + + +def generate_key_pair(curve: str = "ed25519") -> tuple[ECC.EccKey, bytes]: + """ + Generates a public-private key pair for a specific curve. + + Args: + curve (str, optional): The curve to use for generating the key pair. + Defaults to 'ed25519' + + Returns: + ECC.EccKey: Private key in pycryptodome format. + bytes: Public key as bytes. + """ + # Generate private key. + key = ECC.generate(curve=curve) + # Generate public_key + public_key = key.public_key().export_key(format="PEM") + + return key, public_key + + +def generate_agreed_key( + public_key: bytes, + private_key: ECC.EccKey, + key_count: int = 1, + key_length: int = 32, +) -> bytes: + """ + Uses Diffie-Helman key agreement to generate an agreed key between a pair + of public-private keys. + + Args: + public_key (bytes): Public key to be used for key agreement. + private_key (ECC.EccKey): Private key in pycryptodome format to be + used for key agreement. + key_count (int, optional): Number of agreed keys to be generated. + Defaults to 1. + key_length (int, optional): Size of each key in bytes to be generated. + Defaults to 32. + salt (bytes, optional): A non-secret, reusable value that strengthens + the randomness. + Defaults to b'nonce'. + + Returns: + bytes: Agreed key between the two keys shared in args. + """ + import functools + import random + + from Crypto.Hash import SHA256 + from Crypto.Protocol.KDF import HKDF + from Crypto.Random import get_random_bytes + + # Key derivation function. + kdf = functools.partial( + HKDF, + key_len=key_length, + # Ideally, salt should be as long as the digest size of the chosen + # hash; SHA256 has 32 byte (256 bits) digest size. + salt=get_random_bytes(random.randint(32)), + hashmod=SHA256, + num_keys=key_count, + ) + + from Crypto.Protocol.DH import key_agreement + + # Using Diffie-Hellman key agreement. + key = key_agreement( + static_priv=private_key, + static_pub=ECC.import_key(public_key), + kdf=kdf + ) + + return key diff --git a/openfl/utilities/secagg/shamir.py b/openfl/utilities/secagg/shamir.py new file mode 100644 index 0000000000..3ad6256a57 --- /dev/null +++ b/openfl/utilities/secagg/shamir.py @@ -0,0 +1,94 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +""" +This file contains utility functions for Secure Aggregation Shamir's secret +sharing related operations. +""" + +from Crypto.Protocol.SecretSharing import Shamir + +# The length of each chunk of secret used for creation of shares. +SECRET_CHUNK_LENGTH = 16 + + +def create_secret_shares( + secret: bytes, + count: int, + threshold: int, +) -> dict[int, list]: + """ + Using Shamir's secret sharing protocol, creates shares for the secret. + pycryptodome's Shamir.split requires the secret to be of length + SECRET_CHUNK_LENGTH. Since the secret length cannot always be set to be + SECRET_CHUNK_LENGTH, we split the secret byte string into multiple byte + string each of length SECRET_CHUNK_LENGTH and create shares for each of + the SECRET_CHUNK_LENGTH chunks. + During reconstruction, the SECRET_CHUNK_LENGTH chunks are recreated using + the respective shares and then concatenated to form the original secret. + Thus each of the collaborator indices have multiple shares (one for each + chunk) for a single secret. + + Args: + secret (bytes): Secret for which shares are to be created. + count (int): Number of shares to be created for the secret. + threshold (int): Minimum threshold of shares required for + reconstruction of the secret. + + Returns: + dict: Contains the mapping of the index to the shares that belong to + the respective index. + """ + # TODO: Generate a digest to verify that the secret share has not been + # modified. + shares = {} + + from Crypto.Util.Padding import pad + + # Pad the secret to create a byte string of a length which is a multiple + # of SECRET_CHUNK_LENGTH. + secret = pad(secret, SECRET_CHUNK_LENGTH) + # Divide the secret into multiple chunks. + secret_chunks = [ + secret[i: i + SECRET_CHUNK_LENGTH] + for i in range(0, len(secret), SECRET_CHUNK_LENGTH) + ] + # Create shares for each of the chunk. + for chunk in secret_chunks: + chunk_shares = Shamir.split(threshold, count, chunk) + # Map the respective chunk share to the id it belongs to. + for id, share in chunk_shares: + if id not in shares: + shares[id] = [] + shares[id].append(share) + + return shares + + +def reconstruct_secret(shares: dict) -> bytes: + """ + Args: + shares (dict): Contains the mapping of the index to the chunk shares + that belong to the respective index. + + Returns: + bytes: Secret reconstructed from the chunk shares. + """ + secret = b"" + + total_chunks = max(len(share) for share in shares.values()) + for chunk_index in range(total_chunks): + # Create a list for the respective chunk with all the shares. + chunk_shares = [(key, shares[key][chunk_index]) for key in shares] + # Reconstruct the chunk of the secret. + secret_chunk = Shamir.combine(chunk_shares) + # Concatenate the chunk to the secret. + secret += secret_chunk + + from Crypto.Util.Padding import unpad + + # Remove the padding from the secret. + secret = unpad(secret, SECRET_CHUNK_LENGTH) + + return secret From 43a2c93a36d67bd089283f736b32f91f8b1faf77 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Fri, 10 Jan 2025 21:14:09 +0530 Subject: [PATCH 02/10] feat(secure aggregation): tasks WIP --- openfl/federated/task/runner_sa.py | 340 +++++++++++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100644 openfl/federated/task/runner_sa.py diff --git a/openfl/federated/task/runner_sa.py b/openfl/federated/task/runner_sa.py new file mode 100644 index 0000000000..2661f03cf2 --- /dev/null +++ b/openfl/federated/task/runner_sa.py @@ -0,0 +1,340 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +import random +import struct + +import numpy as np + +from openfl.utilities import TensorKey +from openfl.utilities.secagg import ( + create_ciphertext, + create_secret_shares, + decipher_ciphertext, + generate_agreed_key, + generate_key_pair, + pseudo_random_generator +) + + +class SATaskRunner: + def generate_keys( + self, + col_name, + round_number, + input_tensor_dict: dict, + **kwargs, + ): + """ + Generates a pair of private and public keys and returns them in + dictionaries. + + Args: + col_name (str): The column name associated with the keys. + round_number (int): The round number for which the keys are + generated. + input_tensor_dict (dict): A dictionary of input tensors. + **kwargs: Additional keyword arguments. + + Returns: + tuple: A tuple containing two dictionaries: + - local_tensor_dict (dict): A dictionary with private keys and + a random seed. + - global_tensor_dict (dict): A dictionary with public keys. + """ + private_key1, public_key1 = generate_key_pair() + private_key2, public_key2 = generate_key_pair() + + local_tensor_dict = { + TensorKey( + "private_key", col_name, round_number, False, + ("private_key") + ): [private_key1, private_key2], + TensorKey( + "private_seed", col_name, round_number, False, () + ): [random.random()] + } + + global_tensor_dict = { + TensorKey( + "public_key_local", col_name, round_number, False, + ("public_key") + ): [public_key1, public_key2] + } + + return local_tensor_dict, global_tensor_dict + + def generate_ciphertexts( + self, + col_name, + round_number, + input_tensor_dict: dict, + **kwargs, + ): + """ + Generates ciphertexts for secure multi-party computation. + + This method generates ciphertexts for each collaborator using their + public keys and the local private key. It creates secret shares for a + private seed and the private key, then generates agreed keys and + ciphertexts for each collaborator. + + Required tensors for the task include: + - GLOBAL public_key + - public_key_local + + Args: + col_name (str): The column name for the tensor key. + round_number (int): The current round number. + input_tensor_dict (dict): A dictionary containing the required + tensors: + - "public_key": List of public keys for all collaborators. + - "public_key_local": The local public key. + + Returns: + tuple: A tuple containing two dictionaries: + - The first dictionary contains the global output with the + tensor key for ciphertexts. + - The second dictionary contains the local output with tensor + keys for local ciphertexts and the index of the current + collaborator. + """ + global_output = [] + local_output = [] + # public_key is in the format. + # [ + # [collaborator_index, public_key_1, public_key_2], + # [collaborator_index, public_key_1, public_key_2], + # ... + # ] + public_keys = input_tensor_dict["public_key"] + # Get the total number of collaborators participating. + collaborator_count = len(public_keys) + # Get the index of the collaborator by matching the public key. + index_current = -1 + for tensor in public_keys: + if tensor[1] == input_tensor_dict["public_key_local"][0]: + index_current = tensor[0] + break + # Generate a private seed for the collaborator. + private_seed = random.random() + # Create secret shares for the private seed. + seed_shares = create_secret_shares( + # Converts the floating-point number private_seed into an 8-byte + # binary representation. + struct.pack('d', private_seed), + collaborator_count, + collaborator_count, + ) + # Create secret shares for the private key. + key_shares = create_secret_shares( + str.encode(self._private_keys[0].export_key(format="PEM")), + collaborator_count, + collaborator_count, + ) + # Create cipher-texts for each collaborator. + for collaborator_tensor in public_keys: + collab_index = collaborator_tensor[0] + collab_public_key_1 = collaborator_tensor[1] + collab_public_key_2 = collaborator_tensor[2] + # Generate agreed keys for both the public keys. + agreed_key_1 = generate_agreed_key( + self._private_keys[0], collab_public_key_1 + ) + agreed_key_2 = generate_agreed_key( + self._private_keys[1], collab_public_key_2 + ) + # Generate ciphertext for the collaborator. + ciphertext, mac, nonce = create_ciphertext( + agreed_key_1, + index_current, + collab_index, + seed_shares[collab_index], + key_shares[collab_index] + ) + # Local cache for collaborator ID x contains a list which contains + # [x, ciphertext_for_x, mac_for_x, nonce_for_x, + # agreed_key_1_with_x, agreed_key_2_with_x]. + local_output.append( + [ + collab_index, ciphertext, mac, nonce, + agreed_key_1, agreed_key_2 + ] + ) + # Result sent to aggregator contains a row for each collaborator + # such that [source_id, destination_id, ciphertext_source_to_dest]. + global_output.append( + [index_current, collab_index, ciphertext] + ) + + return { + TensorKey( + "ciphertext", col_name, round_number, False, + ("ciphertext") + ): global_output + }, { + TensorKey( + "ciphertext_local", col_name, round_number, False, + ("ciphertext") + ): local_output, + TensorKey( + "index", col_name, round_number, False, () + ): [index_current], + } + + def decrypt_ciphertexts( + + self, + col_name, + round_number, + input_tensor_dict: dict, + **kwargs, + ): + """ + Decrypts the provided ciphertexts and returns the deciphered outputs. + + Required tensors for the task include: + - GLOBAL ciphertext. + - index + - ciphertext_local + + Args: + col_name (str): The name of the column. + round_number (int): The current round number. + input_tensor_dict (dict): A dictionary containing the required + tensors: + - "ciphertext": List of ciphertexts in the format + [[source_collaborator_id, destination_collaborator_id, + ciphertext], ...]. + - "index": The current index. + - "ciphertext_local": Local ciphertext information. + + Returns: + tuple: A tuple containing: + - dict: A dictionary with the key as TensorKey and value as + the list of deciphered outputs. + - dict: An empty dictionary (reserved for future use). + """ + global_output = [] + # ciphertexts is in format + # [ + # [source_collaborator_id, destination_collaborator_id, ciphertext] + # [source_collaborator_id, destination_collaborator_id, ciphertext] + # ... + # ] + ciphertexts = input_tensor_dict["ciphertext"] + index_current = input_tensor_dict["index"] + ciphertext_local = input_tensor_dict["ciphertext_local"] + addressed_ciphertexts = self._filter_ciphertexts( + ciphertexts, index_current + ) + + for ciphertext in addressed_ciphertexts: + source_index = ciphertext[0] + cipher_details = self._fetch_collaborator_ciphertext( + source_index, ciphertext_local + ) + _, _, seed_share, key_share = decipher_ciphertext( + cipher_details[4], # agreed_key_1 + ciphertext[2], # ciphertext + cipher_details[2], # mac + cipher_details[3], # nonce + ) + global_output.append( + source_index, index_current, seed_share, key_share + ) + + return { + TensorKey( + "deciphertext", col_name, round_number, False, + ("deciphertext") + ): global_output + }, {} + + def _filter_ciphertexts(self, ciphertexts, index_current): + """ + Filters the given list of ciphertexts to include only those that match + the specified index. + + Args: + ciphertexts (list): A list of ciphertexts, where each ciphertext + is expected to be a list or tuple. + index_current (int): The index to filter the ciphertexts by. + + Returns: + list: A list of filtered ciphertexts that match the specified + index. + """ + filtered_ciphertexts = [] + for ciphertext in ciphertexts: + if ciphertext[1] == index_current: + filtered_ciphertexts.append(ciphertext) + + return filtered_ciphertexts + + def _fetch_collaborator_ciphertext( + self, collaborator_id, ciphertext_local + ): + """ + Fetches the ciphertext associated with a specific collaborator. + + Args: + collaborator_id (str): The ID of the collaborator whose ciphertext + is to be fetched. + ciphertext_local (list): A list of ciphertexts, where each + ciphertext is a tuple containing a collaborator ID and the + corresponding ciphertext. + + Returns: + The ciphertext associated with the given collaborator ID, or None + if no match is found. + """ + for ciphertext in ciphertext_local: + if ciphertext[0] == collaborator_id: + return ciphertext + + def _mask( + self, + index, + private_seed, + ciphertext_local, + gradient: np.ndarray, + ): + """ + Apply a mask to the gradient using shared and private seeds. + + This function modifies the input gradient by adding masks generated + from shared keys and a private seed. The shared masks are generated + using the keys from `ciphertext_local` and the private mask is + generated using the `private_seed`. + + Args: + index (int): The index of the current collaborator. + private_seed (Any): The private seed used to generate the private + mask. + ciphertext_local (list): A list of ciphertexts, where each + ciphertext contains information about the shared keys and + indices of collaborators. + gradient (np.ndarray): The gradient to be masked. + + Returns: + np.ndarray: The masked gradient. + """ + shape = gradient.shape() + + for ciphertext in ciphertext_local: + # ciphertext[4] is the agreed key for the two collaborators. + shared_mask = pseudo_random_generator(ciphertext[4], shape) + if index < ciphertext[0]: + shared_mask *= -1 + elif index == ciphertext[0]: + shared_mask = 0 + # Add masks for all the collaborators. + gradient = np.add(gradient, shared_mask) + + # Generate private mask for the collaborator. + private_mask = pseudo_random_generator(private_seed, shape) + gradient = np.add(gradient, private_mask) + + return gradient From 6f81222d439c0dee87941f65f106b564b8a3ab5f Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Fri, 10 Jan 2025 21:14:30 +0530 Subject: [PATCH 03/10] feat(secure aggregation): aggregation function WIP --- .../secure_aggregation.py | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 openfl/interface/aggregation_functions/secure_aggregation.py diff --git a/openfl/interface/aggregation_functions/secure_aggregation.py b/openfl/interface/aggregation_functions/secure_aggregation.py new file mode 100644 index 0000000000..141214d4ff --- /dev/null +++ b/openfl/interface/aggregation_functions/secure_aggregation.py @@ -0,0 +1,124 @@ +# Copyright 2020-2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + + +"""Secure aggregation module.""" + +from typing import Iterator, Tuple + +import numpy as np +import pandas as pd + +from openfl.interface.aggregation_functions.core import AggregationFunction +from openfl.utilities import LocalTensor + + +class SecureAggregation(AggregationFunction): + """ + SecureAggregation class for performing secure aggregation of local tensors. + """ + def call( + self, + local_tensors: list[LocalTensor], + db_iterator: Iterator[pd.Series], + tensor_name: str, + fl_round: int, + tags: Tuple[str], + ) -> np.ndarray: + """ + Perform secure aggregation by calling the appropriate aggregation + methods based on the tags. + + Args: + local_tensors (list[LocalTensor]): List of local tensors to be + aggregated. + db_iterator (Iterator[pd.Series]): Iterator over the database + series. + tensor_name (str): Name of the tensor. + fl_round (int): Federated learning round number. + tags (Tuple[str]): Tags indicating the type of aggregation to + perform. + + Returns: + np.ndarray: Aggregated tensor. + """ + self._aggregate_public_keys(local_tensors, tags) + self._aggregate_ciphertexts(local_tensors, tags) + self._aggregate_deciphertexts(local_tensors, tags) + + def _aggregate_public_keys( + self, + local_tensors: list[LocalTensor], + tags: Tuple[str], + ): + """ + Aggregate public keys from the local tensors. + + Args: + local_tensors (list[LocalTensor]): List of local tensors + containing public keys. + tags (Tuple[str]): Tags indicating the type of aggregation to + perform. + + Returns: + np.ndarray: Aggregated public keys tensor. + """ + aggregated_tensor = [] + if "public_key" in tags: + # Setting indices for the collaborators. + index = 1 + for tensor in local_tensors: + # tensor[0] is public_key_1 + # tensor[1] is public_key_2 + aggregated_tensor.append( + [index, tensor.tensor[0], tensor.tensor[1]] + ) + index += 1 + + return np.array(aggregated_tensor) + + def _aggregate_ciphertexts( + self, + local_tensors: list[LocalTensor], + tags: Tuple[str], + ): + """ + Aggregate ciphertexts from the local tensors. + + Args: + local_tensors (list[LocalTensor]): List of local tensors + containing ciphertexts. + tags (Tuple[str]): Tags indicating the type of aggregation to + perform. + + Returns: + np.ndarray: Aggregated ciphertexts tensor. + """ + aggregated_tensor = [] + if "ciphertext" in tags: + aggregated_tensor = [tensor.tensor for tensor in local_tensors] + + return np.array(aggregated_tensor) + + def _aggregate_deciphertexts( + self, + local_tensors: list[LocalTensor], + tags: Tuple[str], + ) -> np.ndarray: + """ + Aggregate deciphertexts from the local tensors. + + Args: + local_tensors (list[LocalTensor]): List of local tensors + containing deciphertexts. + tags (Tuple[str]): Tags indicating the type of aggregation to + perform. + + Returns: + np.ndarray: Aggregated deciphertexts tensor. + """ + aggregated_tensor = [] + if "deciphertext" in tags: + aggregated_tensor = [tensor.tensor for tensor in local_tensors] + + return np.array(aggregated_tensor) From d4219afedc280f1961f055b568372efcf76ac985 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Mon, 13 Jan 2025 14:30:22 +0530 Subject: [PATCH 04/10] restructure: formatting changes Signed-off-by: Pant, Akshay --- openfl/federated/task/runner_sa.py | 90 ++++++++++++----------------- openfl/utilities/secagg/__init__.py | 5 +- openfl/utilities/secagg/crypto.py | 5 +- openfl/utilities/secagg/key.py | 6 +- openfl/utilities/secagg/shamir.py | 3 +- 5 files changed, 41 insertions(+), 68 deletions(-) diff --git a/openfl/federated/task/runner_sa.py b/openfl/federated/task/runner_sa.py index 2661f03cf2..da78ea283e 100644 --- a/openfl/federated/task/runner_sa.py +++ b/openfl/federated/task/runner_sa.py @@ -14,7 +14,7 @@ decipher_ciphertext, generate_agreed_key, generate_key_pair, - pseudo_random_generator + pseudo_random_generator, ) @@ -47,20 +47,18 @@ def generate_keys( private_key2, public_key2 = generate_key_pair() local_tensor_dict = { - TensorKey( - "private_key", col_name, round_number, False, - ("private_key") - ): [private_key1, private_key2], - TensorKey( - "private_seed", col_name, round_number, False, () - ): [random.random()] + TensorKey("private_key", col_name, round_number, False, ("private_key")): [ + private_key1, + private_key2, + ], + TensorKey("private_seed", col_name, round_number, False, ()): [random.random()], } global_tensor_dict = { - TensorKey( - "public_key_local", col_name, round_number, False, - ("public_key") - ): [public_key1, public_key2] + TensorKey("public_key_local", col_name, round_number, False, ("public_key")): [ + public_key1, + public_key2, + ] } return local_tensor_dict, global_tensor_dict @@ -123,7 +121,7 @@ def generate_ciphertexts( seed_shares = create_secret_shares( # Converts the floating-point number private_seed into an 8-byte # binary representation. - struct.pack('d', private_seed), + struct.pack("d", private_seed), collaborator_count, collaborator_count, ) @@ -139,52 +137,34 @@ def generate_ciphertexts( collab_public_key_1 = collaborator_tensor[1] collab_public_key_2 = collaborator_tensor[2] # Generate agreed keys for both the public keys. - agreed_key_1 = generate_agreed_key( - self._private_keys[0], collab_public_key_1 - ) - agreed_key_2 = generate_agreed_key( - self._private_keys[1], collab_public_key_2 - ) + agreed_key_1 = generate_agreed_key(self._private_keys[0], collab_public_key_1) + agreed_key_2 = generate_agreed_key(self._private_keys[1], collab_public_key_2) # Generate ciphertext for the collaborator. ciphertext, mac, nonce = create_ciphertext( agreed_key_1, index_current, collab_index, seed_shares[collab_index], - key_shares[collab_index] + key_shares[collab_index], ) # Local cache for collaborator ID x contains a list which contains # [x, ciphertext_for_x, mac_for_x, nonce_for_x, # agreed_key_1_with_x, agreed_key_2_with_x]. - local_output.append( - [ - collab_index, ciphertext, mac, nonce, - agreed_key_1, agreed_key_2 - ] - ) + local_output.append([collab_index, ciphertext, mac, nonce, agreed_key_1, agreed_key_2]) # Result sent to aggregator contains a row for each collaborator # such that [source_id, destination_id, ciphertext_source_to_dest]. - global_output.append( - [index_current, collab_index, ciphertext] - ) + global_output.append([index_current, collab_index, ciphertext]) return { - TensorKey( - "ciphertext", col_name, round_number, False, - ("ciphertext") - ): global_output + TensorKey("ciphertext", col_name, round_number, False, ("ciphertext")): global_output }, { TensorKey( - "ciphertext_local", col_name, round_number, False, - ("ciphertext") + "ciphertext_local", col_name, round_number, False, ("ciphertext") ): local_output, - TensorKey( - "index", col_name, round_number, False, () - ): [index_current], + TensorKey("index", col_name, round_number, False, ()): [index_current], } def decrypt_ciphertexts( - self, col_name, round_number, @@ -226,29 +206,25 @@ def decrypt_ciphertexts( ciphertexts = input_tensor_dict["ciphertext"] index_current = input_tensor_dict["index"] ciphertext_local = input_tensor_dict["ciphertext_local"] - addressed_ciphertexts = self._filter_ciphertexts( - ciphertexts, index_current - ) + addressed_ciphertexts = self._filter_ciphertexts(ciphertexts, index_current) for ciphertext in addressed_ciphertexts: source_index = ciphertext[0] - cipher_details = self._fetch_collaborator_ciphertext( - source_index, ciphertext_local - ) + cipher_details = self._fetch_collaborator_ciphertext(source_index, ciphertext_local) _, _, seed_share, key_share = decipher_ciphertext( cipher_details[4], # agreed_key_1 ciphertext[2], # ciphertext cipher_details[2], # mac cipher_details[3], # nonce ) - global_output.append( - source_index, index_current, seed_share, key_share - ) + # Result sent to aggregator contains a row for each collaborator + # such that [source_id, destination_id, source_seed_dest_share, + # source_key_dest_share]. + global_output.append(source_index, index_current, seed_share, key_share) return { TensorKey( - "deciphertext", col_name, round_number, False, - ("deciphertext") + "deciphertext", col_name, round_number, False, ("deciphertext") ): global_output }, {} @@ -266,16 +242,19 @@ def _filter_ciphertexts(self, ciphertexts, index_current): list: A list of filtered ciphertexts that match the specified index. """ + # ciphertexts contains a list of global ciphertext values such that + # each row contains + # [source_id, destination_id, ciphertext_source_to_dest] filtered_ciphertexts = [] for ciphertext in ciphertexts: + # Filter the ciphertexts addressed to the "index_current" + # collaborator. if ciphertext[1] == index_current: filtered_ciphertexts.append(ciphertext) return filtered_ciphertexts - def _fetch_collaborator_ciphertext( - self, collaborator_id, ciphertext_local - ): + def _fetch_collaborator_ciphertext(self, collaborator_id, ciphertext_local): """ Fetches the ciphertext associated with a specific collaborator. @@ -290,7 +269,12 @@ def _fetch_collaborator_ciphertext( The ciphertext associated with the given collaborator ID, or None if no match is found. """ + # ciphertext_local contains a list of ciphertext related values in a + # list format. This list is formatted such that for dest collaborator x + # [x, ciphertext_for_x, mac_for_x, nonce_for_x, + # agreed_key_1_with_x, agreed_key_2_with_x]. for ciphertext in ciphertext_local: + # Filter the ciphertexts for collaborator_id. if ciphertext[0] == collaborator_id: return ciphertext diff --git a/openfl/utilities/secagg/__init__.py b/openfl/utilities/secagg/__init__.py index a2265ac969..44b2fd8480 100644 --- a/openfl/utilities/secagg/__init__.py +++ b/openfl/utilities/secagg/__init__.py @@ -8,7 +8,4 @@ pseudo_random_generator, ) from openfl.utilities.secagg.key import generate_agreed_key, generate_key_pair -from openfl.utilities.secagg.shamir import ( - create_secret_shares, - reconstruct_secret -) +from openfl.utilities.secagg.shamir import create_secret_shares, reconstruct_secret diff --git a/openfl/utilities/secagg/crypto.py b/openfl/utilities/secagg/crypto.py index 79db60e6b1..fe43f5d7e2 100644 --- a/openfl/utilities/secagg/crypto.py +++ b/openfl/utilities/secagg/crypto.py @@ -52,10 +52,7 @@ def create_ciphertext( source_id_bytes = source_id.to_bytes(4, byteorder="big") destination_id_bytes = destination_id.to_bytes(4, byteorder="big") # Generate the byte string to be encrypted. - data = ( - source_id_bytes + b"." + destination_id_bytes + b"." + - seed_share + b"." + key_share - ) + data = source_id_bytes + b"." + destination_id_bytes + b"." + seed_share + b"." + key_share # AES cipher requires the secret key to be of a certain length. # We use 64 bytes as it is the maximum length available. padded_secret_key = pad(secret_key, 64) diff --git a/openfl/utilities/secagg/key.py b/openfl/utilities/secagg/key.py index d8c8d9cef4..ee36404a82 100644 --- a/openfl/utilities/secagg/key.py +++ b/openfl/utilities/secagg/key.py @@ -75,10 +75,6 @@ def generate_agreed_key( from Crypto.Protocol.DH import key_agreement # Using Diffie-Hellman key agreement. - key = key_agreement( - static_priv=private_key, - static_pub=ECC.import_key(public_key), - kdf=kdf - ) + key = key_agreement(static_priv=private_key, static_pub=ECC.import_key(public_key), kdf=kdf) return key diff --git a/openfl/utilities/secagg/shamir.py b/openfl/utilities/secagg/shamir.py index 3ad6256a57..efa3fedc36 100644 --- a/openfl/utilities/secagg/shamir.py +++ b/openfl/utilities/secagg/shamir.py @@ -51,8 +51,7 @@ def create_secret_shares( secret = pad(secret, SECRET_CHUNK_LENGTH) # Divide the secret into multiple chunks. secret_chunks = [ - secret[i: i + SECRET_CHUNK_LENGTH] - for i in range(0, len(secret), SECRET_CHUNK_LENGTH) + secret[i : i + SECRET_CHUNK_LENGTH] for i in range(0, len(secret), SECRET_CHUNK_LENGTH) ] # Create shares for each of the chunk. for chunk in secret_chunks: From 6e23793623e14640f86b4d083f56560f02d9b5b0 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Mon, 13 Jan 2025 14:31:33 +0530 Subject: [PATCH 05/10] feat(secure aggregation): adds logic for reconstruction of secrets at the aggregator Signed-off-by: Pant, Akshay --- .../secure_aggregation.py | 115 +++++++++++++++--- 1 file changed, 98 insertions(+), 17 deletions(-) diff --git a/openfl/interface/aggregation_functions/secure_aggregation.py b/openfl/interface/aggregation_functions/secure_aggregation.py index 141214d4ff..734bad2489 100644 --- a/openfl/interface/aggregation_functions/secure_aggregation.py +++ b/openfl/interface/aggregation_functions/secure_aggregation.py @@ -11,12 +11,14 @@ from openfl.interface.aggregation_functions.core import AggregationFunction from openfl.utilities import LocalTensor +from openfl.utilities.secagg import generate_agreed_key, reconstruct_secret class SecureAggregation(AggregationFunction): """ SecureAggregation class for performing secure aggregation of local tensors. """ + def call( self, local_tensors: list[LocalTensor], @@ -44,7 +46,7 @@ def call( """ self._aggregate_public_keys(local_tensors, tags) self._aggregate_ciphertexts(local_tensors, tags) - self._aggregate_deciphertexts(local_tensors, tags) + self._rebuild_secrets(db_iterator, local_tensors, tags) def _aggregate_public_keys( self, @@ -61,7 +63,12 @@ def _aggregate_public_keys( perform. Returns: - np.ndarray: Aggregated public keys tensor. + np.ndarray: Aggregated public keys tensor in format + [ + [collaborator_id, public_key_1, public_2], + [collaborator_id, public_key_1, public_2], + ... + ] """ aggregated_tensor = [] if "public_key" in tags: @@ -70,9 +77,7 @@ def _aggregate_public_keys( for tensor in local_tensors: # tensor[0] is public_key_1 # tensor[1] is public_key_2 - aggregated_tensor.append( - [index, tensor.tensor[0], tensor.tensor[1]] - ) + aggregated_tensor.append([index, tensor.tensor[0], tensor.tensor[1]]) index += 1 return np.array(aggregated_tensor) @@ -92,7 +97,12 @@ def _aggregate_ciphertexts( perform. Returns: - np.ndarray: Aggregated ciphertexts tensor. + np.ndarray: Aggregated ciphertexts tensor in format + [ + [source_collaborator_id, destination_collaborator_id, ciphertext], + [source_collaborator_id, destination_collaborator_id, ciphertext], + ... + ] """ aggregated_tensor = [] if "ciphertext" in tags: @@ -100,25 +110,96 @@ def _aggregate_ciphertexts( return np.array(aggregated_tensor) - def _aggregate_deciphertexts( + def _rebuild_secrets( self, + db_iterator, local_tensors: list[LocalTensor], tags: Tuple[str], - ) -> np.ndarray: + ): """ - Aggregate deciphertexts from the local tensors. + Rebuild secrets from decrypted ciphertext tensors to unmask gradient + vectors. + + This method processes a list of local tensors to reconstruct the + secrets (seeds and private keys) required for unmasking gradient + vectors. It creates dictionaries to store seed shares and key shares + for each source collaborator, then reconstructs the secrets for all + source collaborators. Finally, it generates the agreed keys for all + collaborator permutations. Args: - local_tensors (list[LocalTensor]): List of local tensors - containing deciphertexts. - tags (Tuple[str]): Tags indicating the type of aggregation to - perform. + db_iterator: An iterator for the database. + local_tensors (list[LocalTensor]): A list of LocalTensor objects + containing the decrypted ciphertext tensors. + tags (Tuple[str]): A tuple of tags indicating the type of tensors. Returns: - np.ndarray: Aggregated deciphertexts tensor. + np.ndarray: A numpy array containing the reconstructed seeds and + the agreed keys tensor. """ - aggregated_tensor = [] if "deciphertext" in tags: - aggregated_tensor = [tensor.tensor for tensor in local_tensors] + seed_shares = {} + key_shares = {} + for tensor in local_tensors: + source_collaborator = tensor.tensor[0] + dest_collaborator = tensor.tensor[1] + if source_collaborator not in seed_shares: + seed_shares[source_collaborator] = {} + if source_collaborator not in key_shares: + key_shares[source_collaborator] = {} + seed_shares[source_collaborator][dest_collaborator] = tensor.tensor[2] + key_shares[source_collaborator][dest_collaborator] = tensor.tensor[3] + # Reconstruct the secrets (seeds and private keys) for all source + # collaborators. + seeds = [] + keys = {} + for collaborator in seed_shares: + seed = reconstruct_secret(seed_shares[collaborator]) + seeds.append([collaborator, seed]) + keys[collaborator] = reconstruct_secret(key_shares[collaborator]) + # Generate the agreed keys for all collaborator permutations. + agreed_keys_tensor = self._generate_agreed_keys(keys, db_iterator) + + return np.array([seeds, agreed_keys_tensor]) + + def _generate_agreed_keys(self, reconstructed_keys, db_iterator): + """ + This function takes reconstructed private keys and a database iterator + to fetch public keys, and generates agreed keys for all permutations + of collaborators. - return np.array(aggregated_tensor) + Args: + reconstructed_keys (dict): A dictionary where keys are + collaborator IDs and values are their reconstructed private + keys. + db_iterator (iterable): An iterable that yields items containing + public keys in their "tags". + + Returns: + list: A list of lists, where each inner list contains: + - private_key_collaborator_id (int): The ID of the + collaborator with the private key. + - public_key_collaborator_id (int): The ID of the collaborator + with the public key. + - agreed_key (Any): The generated agreed key for the pair of + collaborators. + """ + agreed_keys = [] + # Fetch the public keys from tensor db. + public_keys = [] + for item in db_iterator: + if "tags" in item and "public_key" in item["tags"]: + public_keys.append(item["nparray"]) + # Generate agreed keys for all collaborator permutations. + for item in public_keys: + public_key_collaborator_id = item[0] + public_key_1 = item[1] + for private_key_collaborator_id in reconstructed_keys: + agreed_key = generate_agreed_key( + reconstructed_keys[private_key_collaborator_id], public_key_1 + ) + agreed_keys.append( + [private_key_collaborator_id, public_key_collaborator_id, agreed_key] + ) + + return agreed_keys From a0af32303f1894470ced0905365c1f824784fd0c Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Mon, 13 Jan 2025 15:37:38 +0530 Subject: [PATCH 06/10] feat(secure aggregation): adds the aggregation logic removing the masks from the vectors Signed-off-by: Pant, Akshay --- openfl/federated/task/runner_sa.py | 4 +- .../secure_aggregation.py | 79 ++++++++++++++++++- 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/openfl/federated/task/runner_sa.py b/openfl/federated/task/runner_sa.py index da78ea283e..af59e265f1 100644 --- a/openfl/federated/task/runner_sa.py +++ b/openfl/federated/task/runner_sa.py @@ -311,9 +311,9 @@ def _mask( # ciphertext[4] is the agreed key for the two collaborators. shared_mask = pseudo_random_generator(ciphertext[4], shape) if index < ciphertext[0]: - shared_mask *= -1 + shared_mask = np.dot(-1, shared_mask) elif index == ciphertext[0]: - shared_mask = 0 + shared_mask = np.dot(0, shared_mask) # Add masks for all the collaborators. gradient = np.add(gradient, shared_mask) diff --git a/openfl/interface/aggregation_functions/secure_aggregation.py b/openfl/interface/aggregation_functions/secure_aggregation.py index 734bad2489..fb6122a5b1 100644 --- a/openfl/interface/aggregation_functions/secure_aggregation.py +++ b/openfl/interface/aggregation_functions/secure_aggregation.py @@ -11,7 +11,7 @@ from openfl.interface.aggregation_functions.core import AggregationFunction from openfl.utilities import LocalTensor -from openfl.utilities.secagg import generate_agreed_key, reconstruct_secret +from openfl.utilities.secagg import generate_agreed_key, pseudo_random_generator, reconstruct_secret class SecureAggregation(AggregationFunction): @@ -47,6 +47,7 @@ def call( self._aggregate_public_keys(local_tensors, tags) self._aggregate_ciphertexts(local_tensors, tags) self._rebuild_secrets(db_iterator, local_tensors, tags) + self._aggregate(local_tensors, db_iterator, tags) def _aggregate_public_keys( self, @@ -203,3 +204,79 @@ def _generate_agreed_keys(self, reconstructed_keys, db_iterator): ) return agreed_keys + + def _aggregate(self, local_tensors, db_iterator, tags): + """ + Aggregates local tensors using secure aggregation. + + This method performs secure aggregation by reconstructing private + seeds and agreed keys, generating private and shared masks, and then + applying these masks to the local tensors. + + Args: + local_tensors (list): A list of local tensors to be aggregated. + db_iterator (iterator): An iterator over the database containing + the secrets. + tags (list): A list of tags to filter the database items. + + Returns: + numpy.ndarray: The aggregated result after applying the masks. + + Notes: + - The `rebuilt_secrets[0]` contains private seeds in the format: + [ + [collaborator_id, private_seed], + ... + ] + - The `rebuilt_secrets[1]` contains agreed keys in the format: + [ + [collaborator_1, collaborator_2, agreed_key_1_2], + ... + ] + - The method checks for the presence of specific tags + ("public_key", "ciphertext", "deciphertext") to determine if + the aggregation should proceed. + - The shape of the local tensors is assumed to be the same for all + tensors. + - Private masks are generated using the private seeds, and shared + masks are generated using the agreed keys. + - The final result is obtained by subtracting the total mask sum + from the masked sum of the input tensors. + """ + if not any(element in tags for element in ["public_key", "ciphertext", "deciphertext"]): + # Get the private seeds and agreed keys. + rebuilt_secrets = [] + for item in db_iterator: + if "tags" in item and "deciphertext" in item["tags"]: + rebuilt_secrets.append(item["nparray"]) + + # Get the shape of the local tensors sent for aggregation. + if len(local_tensors) > 0: + shape = local_tensors[0].tensor.shape() + + # Get sum of all private masks. + private_mask_sum = 0 + for seed in rebuilt_secrets[0]: + private_mask_sum = np.add(private_mask_sum, pseudo_random_generator(seed[1], shape)) + + # Get sum of all shared masks. + shared_mask_sum = 0 + for agreed_key in rebuilt_secrets[1]: + collaborator_1 = agreed_key[0] + collaborator_2 = agreed_key[1] + shared_mask = pseudo_random_generator(agreed_key[2], shape) + if collaborator_1 < collaborator_2: + shared_mask = np.dot(-1, shared_mask) + elif collaborator_1 == collaborator_2: + shared_mask = np.dot(0, shared_mask) + shared_mask_sum = np.add(shared_mask_sum, shared_mask) + total_mask_sum = np.sum(private_mask_sum, shared_mask_sum) + + # Get the sum of the masked input vectors. + masked_sum = np.sum([tensor.tensor for tensor in local_tensors]) + + # Get the sum of the vectors after removing the masks. + gradient_sum = np.subtract(masked_sum, total_mask_sum) + + # Return the average. + return np.divide(gradient_sum, len(local_tensors)) From fa5d8d34307bf8caf39cebea8525141ac6c00bc7 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Fri, 17 Jan 2025 11:09:50 +0530 Subject: [PATCH 07/10] feat(secure aggregation): added init tensorkeys method for the runner Signed-off-by: Pant, Akshay --- openfl/federated/task/runner_sa.py | 55 ++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/openfl/federated/task/runner_sa.py b/openfl/federated/task/runner_sa.py index af59e265f1..bb744b2668 100644 --- a/openfl/federated/task/runner_sa.py +++ b/openfl/federated/task/runner_sa.py @@ -7,6 +7,7 @@ import numpy as np +from openfl.federated.task.runner import TaskRunner from openfl.utilities import TensorKey from openfl.utilities.secagg import ( create_ciphertext, @@ -18,7 +19,45 @@ ) -class SATaskRunner: +class SATaskRunner(TaskRunner): + def __init__(self, device: str = None, loss_fn=None, optimizer=None, **kwargs): + super().__init__(self, **kwargs) + self.required_tensorkeys_for_function = {} + + def initialize_tensorkeys_for_functions(self, with_opt_vars=False): + """ + Initialize the required TensorKeys for various functions. + + This method sets up the required TensorKeys for the functions + "generate_keys", "generate_ciphertexts", and "decrypt_ciphertexts". + The TensorKeys specify the name, scope, version, and other attributes + for the tensors used in these functions. + + Args: + with_opt_vars (bool): If True, includes optional variables in the + TensorKeys. Defaults to False. + + TensorKeys: + - "generate_keys": No TensorKeys required. + - "generate_ciphertexts": + - public_key: A global tensor key for the public key. + - public_key_local: A local tensor key for the public key. + - "decrypt_ciphertexts": + - ciphertext: A global tensor key for the ciphertext. + - ciphertext_local: A local tensor key for the ciphertext. + - index: A local tensor key for the index. + """ + self.required_tensorkeys_for_function["generate_keys"] = [] + self.required_tensorkeys_for_function["generate_ciphertexts"] = [ + TensorKey("public_key", "GLOBAL", 1, False, ("public_key")), + TensorKey("public_key_local", "LOCAL", 1, False, ("public_key")), + ] + self.required_tensorkeys_for_function["decrypt_ciphertexts"] = [ + TensorKey("ciphertext", "GLOBAL", 1, False, ("ciphertext")), + TensorKey("ciphertext_local", "LOCAL", 1, False, ("ciphertext_local")), + TensorKey("index", "LOCAL", 1, False, ()) + ] + def generate_keys( self, col_name, @@ -51,11 +90,15 @@ def generate_keys( private_key1, private_key2, ], + TensorKey("public_key_local", col_name, round_number, False, ("public_key")): [ + public_key1, + public_key2, + ], TensorKey("private_seed", col_name, round_number, False, ()): [random.random()], } global_tensor_dict = { - TensorKey("public_key_local", col_name, round_number, False, ("public_key")): [ + TensorKey("public_key", col_name, round_number, False, ("public_key")): [ public_key1, public_key2, ] @@ -80,7 +123,7 @@ def generate_ciphertexts( Required tensors for the task include: - GLOBAL public_key - - public_key_local + - LOCAL public_key_local Args: col_name (str): The column name for the tensor key. @@ -176,10 +219,10 @@ def decrypt_ciphertexts( Required tensors for the task include: - GLOBAL ciphertext. - - index - - ciphertext_local + - LOCAL ciphertext_local + - LOCAL index - Args: + Args: col_name (str): The name of the column. round_number (int): The current round number. input_tensor_dict (dict): A dictionary containing the required From c16b4df8b93aa3b4733aa07e26c053bc969bec5c Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Fri, 17 Jan 2025 11:11:29 +0530 Subject: [PATCH 08/10] feat(secagg example): add initial commit for secagg example workspace Signed-off-by: Pant, Akshay --- .../keras_cnn_mnist_secagg/.workspace | 2 + .../keras_cnn_mnist_secagg/plan/cols.yaml | 5 + .../keras_cnn_mnist_secagg/plan/data.yaml | 7 ++ .../keras_cnn_mnist_secagg/plan/defaults | 2 + .../keras_cnn_mnist_secagg/plan/plan.yaml | 63 ++++++++++ .../keras_cnn_mnist_secagg/requirements.txt | 3 + .../keras_cnn_mnist_secagg/src/__init__.py | 3 + .../keras_cnn_mnist_secagg/src/dataloader.py | 47 +++++++ .../keras_cnn_mnist_secagg/src/mnist_utils.py | 118 ++++++++++++++++++ .../keras_cnn_mnist_secagg/src/taskrunner.py | 26 ++++ 10 files changed, 276 insertions(+) create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/.workspace create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/plan/cols.yaml create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/plan/data.yaml create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/plan/defaults create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/requirements.txt create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/src/__init__.py create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/src/dataloader.py create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/src/mnist_utils.py create mode 100644 openfl-workspace/keras_cnn_mnist_secagg/src/taskrunner.py diff --git a/openfl-workspace/keras_cnn_mnist_secagg/.workspace b/openfl-workspace/keras_cnn_mnist_secagg/.workspace new file mode 100644 index 0000000000..3c2c5d08b4 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/.workspace @@ -0,0 +1,2 @@ +current_plan_name: default + diff --git a/openfl-workspace/keras_cnn_mnist_secagg/plan/cols.yaml b/openfl-workspace/keras_cnn_mnist_secagg/plan/cols.yaml new file mode 100644 index 0000000000..95307de3bc --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/plan/cols.yaml @@ -0,0 +1,5 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +collaborators: + \ No newline at end of file diff --git a/openfl-workspace/keras_cnn_mnist_secagg/plan/data.yaml b/openfl-workspace/keras_cnn_mnist_secagg/plan/data.yaml new file mode 100644 index 0000000000..257c7825fe --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/plan/data.yaml @@ -0,0 +1,7 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +# collaborator_name,data_directory_path +one,1 + + diff --git a/openfl-workspace/keras_cnn_mnist_secagg/plan/defaults b/openfl-workspace/keras_cnn_mnist_secagg/plan/defaults new file mode 100644 index 0000000000..fb82f9c5b6 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/plan/defaults @@ -0,0 +1,2 @@ +../../workspace/plan/defaults + diff --git a/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml b/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml new file mode 100644 index 0000000000..576a93a132 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml @@ -0,0 +1,63 @@ +# Copyright (C) 2020-2021 Intel Corporation +# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you. + +aggregator : + defaults : plan/defaults/aggregator.yaml + template : openfl.component.Aggregator + settings : + init_state_path : save/init.pbuf + best_state_path : save/best.pbuf + last_state_path : save/last.pbuf + rounds_to_train : 10 + +collaborator : + defaults : plan/defaults/collaborator.yaml + template : openfl.component.Collaborator + settings : + delta_updates : false + opt_treatment : RESET + +data_loader : + defaults : plan/defaults/data_loader.yaml + template : src.dataloader.KerasMNISTInMemory + settings : + collaborator_count : 2 + data_group_name : mnist + batch_size : 256 + +task_runner : + defaults : plan/defaults/task_runner.yaml + template : src.taskrunner.KerasCNN + +network : + defaults : plan/defaults/network.yaml + +assigner : + defaults : plan/defaults/assigner.yaml + template : openfl.component.RandomGroupedAssigner + settings : + task_groups : + - name : secagg_setup + percentage : 1.0 + tasks : + - generate_keys + - generate_ciphertexts + - decrypt_ciphertexts + +tasks: + decrypt_ciphertexts: + function: decrypt_ciphertexts + kwargs: {} + generate_ciphertexts: + function: generate_ciphertexts + kwargs: {} + generate_keys: + function: generate_keys + kwargs: {} + +compression_pipeline : + defaults : plan/defaults/compression_pipeline.yaml + # To use different Compression Pipeline, uncomment the following lines + # template : openfl.pipelines.KCPipeline + # settings : + # n_clusters : 6 diff --git a/openfl-workspace/keras_cnn_mnist_secagg/requirements.txt b/openfl-workspace/keras_cnn_mnist_secagg/requirements.txt new file mode 100644 index 0000000000..5fa9907811 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/requirements.txt @@ -0,0 +1,3 @@ +keras==3.6.0 +tensorflow==2.18.0 + diff --git a/openfl-workspace/keras_cnn_mnist_secagg/src/__init__.py b/openfl-workspace/keras_cnn_mnist_secagg/src/__init__.py new file mode 100644 index 0000000000..f1410b1298 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/src/__init__.py @@ -0,0 +1,3 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 +"""You may copy this file as the starting point of your own model.""" diff --git a/openfl-workspace/keras_cnn_mnist_secagg/src/dataloader.py b/openfl-workspace/keras_cnn_mnist_secagg/src/dataloader.py new file mode 100644 index 0000000000..040e8091c9 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/src/dataloader.py @@ -0,0 +1,47 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from openfl.federated import KerasDataLoader +from .mnist_utils import load_mnist_shard + + +class KerasMNISTInMemory(KerasDataLoader): + """Data Loader for MNIST Dataset.""" + + def __init__(self, data_path, batch_size, **kwargs): + """ + Initialize. + + Args: + data_path: File path for the dataset + batch_size (int): The batch size for the data loader + **kwargs: Additional arguments, passed to super init and load_mnist_shard + """ + super().__init__(batch_size, **kwargs) + + # TODO: We should be downloading the dataset shard into a directory + # TODO: There needs to be a method to ask how many collaborators and + # what index/rank is this collaborator. + # Then we have a way to automatically shard based on rank and size of + # collaborator list. + try: + int(data_path) + except: + raise ValueError( + "Expected `%s` to be representable as `int`, as it refers to the data shard " + + "number used by the collaborator.", + data_path + ) + + _, num_classes, X_train, y_train, X_valid, y_valid = load_mnist_shard( + shard_num=int(data_path), **kwargs + ) + + self.X_train = X_train + self.y_train = y_train + self.X_valid = X_valid + self.y_valid = y_valid + + self.num_classes = num_classes diff --git a/openfl-workspace/keras_cnn_mnist_secagg/src/mnist_utils.py b/openfl-workspace/keras_cnn_mnist_secagg/src/mnist_utils.py new file mode 100644 index 0000000000..d19e13d9dd --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/src/mnist_utils.py @@ -0,0 +1,118 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from logging import getLogger + +import numpy as np +from tensorflow.python.keras.utils.data_utils import get_file + +logger = getLogger(__name__) + + +def one_hot(labels, classes): + """ + One Hot encode a vector. + + Args: + labels (list): List of labels to onehot encode + classes (int): Total number of categorical classes + + Returns: + np.array: Matrix of one-hot encoded labels + """ + return np.eye(classes)[labels] + + +def _load_raw_datashards(shard_num, collaborator_count): + """ + Load the raw data by shard. + + Returns tuples of the dataset shard divided into training and validation. + + Args: + shard_num (int): The shard number to use + collaborator_count (int): The number of collaborators in the federation + + Returns: + 2 tuples: (image, label) of the training, validation dataset + """ + origin_folder = 'https://storage.googleapis.com/tensorflow/tf-keras-datasets/' + path = get_file('mnist.npz', + origin=origin_folder + 'mnist.npz', + file_hash='731c5ac602752760c8e48fbffcf8c3b850d9dc2a2aedcf2cc48468fc17b673d1') + + with np.load(path) as f: + # get all of mnist + X_train_tot = f['x_train'] + y_train_tot = f['y_train'] + + X_valid_tot = f['x_test'] + y_valid_tot = f['y_test'] + + # create the shards + shard_num = int(shard_num) + X_train = X_train_tot[shard_num::collaborator_count] + y_train = y_train_tot[shard_num::collaborator_count] + + X_valid = X_valid_tot[shard_num::collaborator_count] + y_valid = y_valid_tot[shard_num::collaborator_count] + + return (X_train, y_train), (X_valid, y_valid) + + +def load_mnist_shard(shard_num, collaborator_count, categorical=True, + channels_last=True, **kwargs): + """ + Load the MNIST dataset. + + Args: + shard_num (int): The shard to use from the dataset + collaborator_count (int): The number of collaborators in the federation + categorical (bool): True = convert the labels to one-hot encoded + vectors (Default = True) + channels_last (bool): True = The input images have the channels + last (Default = True) + **kwargs: Additional parameters to pass to the function + + Returns: + list: The input shape + int: The number of classes + numpy.ndarray: The training data + numpy.ndarray: The training labels + numpy.ndarray: The validation data + numpy.ndarray: The validation labels + """ + img_rows, img_cols = 28, 28 + num_classes = 10 + + (X_train, y_train), (X_valid, y_valid) = _load_raw_datashards( + shard_num, collaborator_count + ) + + if channels_last: + X_train = X_train.reshape(X_train.shape[0], img_rows, img_cols, 1) + X_valid = X_valid.reshape(X_valid.shape[0], img_rows, img_cols, 1) + input_shape = (img_rows, img_cols, 1) + else: + X_train = X_train.reshape(X_train.shape[0], 1, img_rows, img_cols) + X_valid = X_valid.reshape(X_valid.shape[0], 1, img_rows, img_cols) + input_shape = (1, img_rows, img_cols) + + X_train = X_train.astype('float32') + X_valid = X_valid.astype('float32') + X_train /= 255 + X_valid /= 255 + + logger.info(f'MNIST > X_train Shape : {X_train.shape}') + logger.info(f'MNIST > y_train Shape : {y_train.shape}') + logger.info(f'MNIST > Train Samples : {X_train.shape[0]}') + logger.info(f'MNIST > Valid Samples : {X_valid.shape[0]}') + + if categorical: + # convert class vectors to binary class matrices + y_train = one_hot(y_train, num_classes) + y_valid = one_hot(y_valid, num_classes) + + return input_shape, num_classes, X_train, y_train, X_valid, y_valid diff --git a/openfl-workspace/keras_cnn_mnist_secagg/src/taskrunner.py b/openfl-workspace/keras_cnn_mnist_secagg/src/taskrunner.py new file mode 100644 index 0000000000..c7b3c3dd65 --- /dev/null +++ b/openfl-workspace/keras_cnn_mnist_secagg/src/taskrunner.py @@ -0,0 +1,26 @@ +# Copyright (C) 2020-2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""You may copy this file as the starting point of your own model.""" + +from keras.models import Sequential +from keras.layers import Conv2D +from keras.layers import Dense +from keras.layers import Flatten + +from openfl.federated.task.runner_sa import SATaskRunner + + +class KerasCNN(SATaskRunner): + """A basic convolutional neural network model.""" + + def __init__(self, **kwargs): + """ + Initialize. + + Args: + **kwargs: Additional parameters to pass to the function + """ + super().__init__(**kwargs) + + self.initialize_tensorkeys_for_functions() From dc0de2d70fb6b66988c8834bb5db7f54918b7fe7 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Tue, 21 Jan 2025 16:56:09 +0530 Subject: [PATCH 09/10] fix(secagg workspace): update the tasks order Signed-off-by: Pant, Akshay --- openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml b/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml index 576a93a132..e48b73ee08 100644 --- a/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml +++ b/openfl-workspace/keras_cnn_mnist_secagg/plan/plan.yaml @@ -45,14 +45,14 @@ assigner : - decrypt_ciphertexts tasks: - decrypt_ciphertexts: - function: decrypt_ciphertexts + generate_keys: + function: generate_keys kwargs: {} generate_ciphertexts: function: generate_ciphertexts kwargs: {} - generate_keys: - function: generate_keys + decrypt_ciphertexts: + function: decrypt_ciphertexts kwargs: {} compression_pipeline : From a476748f375c1418d164a657a4021d347672a495 Mon Sep 17 00:00:00 2001 From: "Pant, Akshay" Date: Tue, 21 Jan 2025 16:59:07 +0530 Subject: [PATCH 10/10] docs(secagg runner): add note defining the use of class Signed-off-by: Pant, Akshay --- openfl/federated/task/runner_sa.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/openfl/federated/task/runner_sa.py b/openfl/federated/task/runner_sa.py index bb744b2668..055bf1253a 100644 --- a/openfl/federated/task/runner_sa.py +++ b/openfl/federated/task/runner_sa.py @@ -20,6 +20,12 @@ class SATaskRunner(TaskRunner): + """ + NOTE: This class is only for testing purposes. + The tasks/methods define din this class would be placed in TaskRunner + class. By doing so, the users would be able to leverage secure aggregation + using a framework of their choice: torch, keras, etc. + """ def __init__(self, device: str = None, loss_fn=None, optimizer=None, **kwargs): super().__init__(self, **kwargs) self.required_tensorkeys_for_function = {}