From cacf3f882fc94cbcfb2debbed3d5e4006a588b9b Mon Sep 17 00:00:00 2001 From: Boris Lami Fonyuy Date: Mon, 16 Oct 2023 05:11:26 -0700 Subject: [PATCH] Add functionality that avoids putting neighbouring examples into different train/test split PiperOrigin-RevId: 573779093 --- requirements.txt | 1 + src/create_labeled_dataset.py | 3 + src/skai/cloud_labeling.py | 170 ++++++++++++++++++++++--- src/skai/cloud_labeling_test.py | 219 +++++++++++++++++++++++++++++--- src/skai/utils.py | 13 ++ src/skai/utils_test.py | 55 ++++++++ 6 files changed, 420 insertions(+), 41 deletions(-) create mode 100644 src/skai/utils_test.py diff --git a/requirements.txt b/requirements.txt index 01149677..d73231a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,3 +25,4 @@ tqdm openlocationcode xmanager tensorflow-text +scipy \ No newline at end of file diff --git a/src/create_labeled_dataset.py b/src/create_labeled_dataset.py index 191c0d24..4c801880 100644 --- a/src/create_labeled_dataset.py +++ b/src/create_labeled_dataset.py @@ -42,6 +42,8 @@ 'If specified, random seed for train/test split.') flags.DEFINE_float('test_fraction', 0.2, 'Fraction of labeled examples to use for testing.') +flags.DEFINE_float('connecting_distance_meters', 77.0, + 'Maximum distance for two points to be connected.') flags.DEFINE_list( 'string_to_numeric_labels', [ @@ -75,6 +77,7 @@ def main(unused_argv): FLAGS.test_fraction, FLAGS.train_output_path, FLAGS.test_output_path, + FLAGS.connecting_distance_meters, FLAGS.use_multiprocessing) diff --git a/src/skai/cloud_labeling.py b/src/skai/cloud_labeling.py index fc69751f..53587b19 100644 --- a/src/skai/cloud_labeling.py +++ b/src/skai/cloud_labeling.py @@ -22,16 +22,18 @@ import queue import random import time -from typing import Dict, Iterable, List, Optional, Tuple, Set +from typing import Dict, Iterable, List, Optional, Set, Tuple from absl import logging - +import geopandas as gpd from google.cloud import aiplatform from google.cloud import aiplatform_v1 +import numpy as np import pandas as pd import PIL.Image import PIL.ImageDraw import PIL.ImageFont +import scipy from skai import utils import tensorflow as tf @@ -490,40 +492,158 @@ def _write_tfrecord(examples: Iterable[Example], path: str) -> None: writer.write(example.SerializeToString()) +def get_connection_matrix( + longitudes: List[float], + latitudes: List[float], + encoded_coordinates: List[str], + connecting_distance_meters: float, +)-> Tuple[gpd.GeoDataFrame, np.ndarray]: + """Gets a connection matrix for a set of points. + + Args: + longitudes: Longitudes of points. + latitudes: Latitudes of points. + encoded_coordinates: Encoded coordinates of points. + connecting_distance_meters: Maximum distance for two points to be connected. + + Returns: + Tuple of (GeoDataFrame, connection_matrix). + """ + points = gpd.GeoSeries(gpd.points_from_xy( + longitudes, + latitudes, + )).set_crs(4326) + + centroid = points.unary_union.centroid + utm_points = points.to_crs(utils.convert_wgs_to_utm(centroid.x, centroid.y)) + + gpd_df = gpd.GeoDataFrame( + {'encoded_coordinates': encoded_coordinates}, + geometry=utm_points + ) + + def calculate_euclidean_distance(row): + return gpd_df.distance(row.geometry) + + distances = np.array(gpd_df.apply(calculate_euclidean_distance, axis=1)) + connection_matrix = (distances < connecting_distance_meters).astype('int') + + assert connection_matrix.shape == ( + len(encoded_coordinates), + len(encoded_coordinates), + ) + + return gpd_df, connection_matrix + + +def get_connected_labels( + connection_matrix: np.ndarray, +) -> List[str]: + """Gets the labels of connected components. + + Args: + connection_matrix: Connection matrix. + + Returns: + List of labels of connected components. Components with the same label are + connected and are therefore connected. + """ + graph = scipy.sparse.csr_matrix(connection_matrix) + _, labels = scipy.sparse.csgraph.connected_components( + csgraph=graph, directed=False, return_labels=True + ) + + return list(labels) + + def _split_examples( examples: List[Example], - test_fraction: float + test_fraction: float, + connecting_distance_meters: float, ) -> Tuple[List[Example], List[Example]]: """Splits a list of examples into training and test sets. Examples with the same encoded coordinates will always end up in the same - split to prevent leaking information between training and test sets. + split to prevent leaking information between training and test sets. Any two + examples separated by less than connecting_distance_meters will always be in + the same split. Args: examples: Input examples. test_fraction: Fraction of examples to use for testing. + connecting_distance_meters: Maximum distance for two points to be connected. Returns: Tuple of (training examples, test examples). """ - coordinates_to_examples = collections.defaultdict(list) + longitudes = [] + latitudes = [] + encoded_coordinates = [] for example in examples: - c = example.features.feature['encoded_coordinates'].bytes_list.value[0] - coordinates_to_examples[c].append(example) - - shuffled = random.sample(sorted(coordinates_to_examples.keys()), - len(coordinates_to_examples)) - num_test = int(len(shuffled) * test_fraction) - test_examples = [] - for coordinate in shuffled[:num_test]: - test_examples.extend(coordinates_to_examples[coordinate]) - - train_examples = [] - for coordinate in shuffled[num_test:]: - train_examples.extend(coordinates_to_examples[coordinate]) + encoded_coordinate = utils.get_string_feature( + example, 'encoded_coordinates' + ) + longitude, latitude = utils.get_float_feature(example, 'coordinates') + longitudes.append(longitude) + latitudes.append(latitude) + encoded_coordinates.append(encoded_coordinate) + + gpd_df, connection_matrix = get_connection_matrix( + longitudes, latitudes, encoded_coordinates, connecting_distance_meters + ) + labels = get_connected_labels(connection_matrix) + connected_groups = collections.defaultdict(list) + for idx, key in enumerate(labels): + connected_groups[key].append(idx) + + list_of_connected_examples = [] + for _, connected_group in connected_groups.items(): + list_of_connected_examples.append(connected_group) + + num_test = int(len(gpd_df) * test_fraction) + test_indices = get_testset_indices(num_test, list_of_connected_examples) + test_examples = [examples[idx] for idx in test_indices] + train_examples = [ + examples[idx] for idx in range(len(examples)) if idx not in test_indices + ] + return train_examples, test_examples +def get_testset_indices(num_test, list_of_connected_examples): + """Get random list of indices corresponding to test examples. + + Args: + num_test: Number of test examples. + list_of_connected_examples: List of connected examples. + + Returns: + List of indices corresponding test examples. + """ + max_num_attempts_train_test_splits = 10000 + best_test_indices = [] + min_diff_best_num_test = num_test + + for _ in range(max_num_attempts_train_test_splits): + # Ensure randomness + random_list_of_connected_examples = random.sample( + list_of_connected_examples, len(list_of_connected_examples) + ) + current_test_indices = [] + + for connected_component in random_list_of_connected_examples: + current_test_indices.extend(connected_component) + if abs(len(current_test_indices) - num_test) < min_diff_best_num_test: + best_test_indices = current_test_indices.copy() + min_diff_best_num_test = abs(len(best_test_indices) - num_test) + + # Stop trials once best best_test_indices is found + if min_diff_best_num_test == 0: + return best_test_indices + + return best_test_indices + + def _merge_single_example_file_and_labels( example_file: str, labels: Dict[str, List[Tuple[str, float, str]]] ) -> List[Example]: @@ -583,6 +703,7 @@ def _merge_examples_and_labels( test_fraction: float, train_output_path: str, test_output_path: str, + connecting_distance_meters: float, use_multiprocessing: bool, ) -> None: """Merges examples with labels into train and test TFRecords. @@ -594,6 +715,7 @@ def _merge_examples_and_labels( test_fraction: Fraction of examples to write to test output. train_output_path: Path to training examples TFRecord output. test_output_path: Path to test examples TFRecord output. + connecting_distance_meters: Maximum distance for two points to be connected. use_multiprocessing: If true, create multiple processes to create labeled examples. """ @@ -629,11 +751,16 @@ def _merge_examples_and_labels( all_labeled_examples.extend(result) train_examples, test_examples = _split_examples( - all_labeled_examples, test_fraction + all_labeled_examples, test_fraction, connecting_distance_meters ) _write_tfrecord(train_examples, train_output_path) _write_tfrecord(test_examples, test_output_path) + logging.info( + 'Written %d test examples and %d train examples', + len(test_examples), + len(train_examples), + ) def _get_labels_from_dataset( @@ -669,7 +796,6 @@ def _get_labels_from_dataset( labels.update(_read_label_annotations_file(path)) tf.io.gfile.remove(path) - logging.info('Read %d labels total.', len(labels)) return [ (example_id, label, dataset_id) for example_id, label in labels.items() ] @@ -709,6 +835,7 @@ def create_labeled_examples( test_fraction: float, train_output_path: str, test_output_path: str, + connecting_distance_meters: float, use_multiprocessing: bool) -> None: """Creates a labeled dataset by merging cloud labels and unlabeled examples. @@ -724,6 +851,7 @@ def create_labeled_examples( test_fraction: Fraction of examples to write to test output. train_output_path: Path to training examples TFRecord output. test_output_path: Path to test examples TFRecord output. + connecting_distance_meters: Maximum distance for two points to be connected. use_multiprocessing: If true, create multiple processes to create labeled examples. """ @@ -749,6 +877,7 @@ def create_labeled_examples( for path in label_file_paths: labels.extend(_read_label_file(path)) + logging.info('Read %d labels total.', len(labels)) ids_to_labels = collections.defaultdict(list) for example_id, string_label, dataset_id in labels: example_labels = ids_to_labels[example_id] @@ -766,5 +895,6 @@ def create_labeled_examples( test_fraction, train_output_path, test_output_path, + connecting_distance_meters, use_multiprocessing, ) diff --git a/src/skai/cloud_labeling_test.py b/src/skai/cloud_labeling_test.py index d92bc9cb..8e2f7591 100644 --- a/src/skai/cloud_labeling_test.py +++ b/src/skai/cloud_labeling_test.py @@ -19,8 +19,12 @@ from typing import List from absl.testing import absltest +from absl.testing import parameterized +import geopandas as gpd +import numpy as np import pandas as pd import PIL.Image +import shapely from skai import cloud_labeling import tensorflow as tf @@ -36,7 +40,7 @@ def _read_tfrecord(path: str) -> List[Example]: return examples -class CloudLabelingTest(absltest.TestCase): +class CloudLabelingTest(parameterized.TestCase): def testCreateLabelingImageBasic(self): before_image = PIL.Image.new('RGB', (64, 64)) @@ -85,17 +89,22 @@ def testCreateLabeledExamplesFromLabelFile(self): example.features.feature['encoded_coordinates'].bytes_list.value.append( str(i).encode() ) + example.features.feature['coordinates'].float_list.value.extend([i, i]) writer.write(example.SerializeToString()) # Create a label file. _, label_file_path = tempfile.mkstemp(dir=absltest.TEST_TMPDIR.value) - label_file_contents = pd.DataFrame([ - ('a', 'no_damage'), - ('b', 'minor_damage'), - ('c', 'major_damage'), - ('c', 'no_damage'), - ('d', 'destroyed'), - ('d', 'bad_example')], columns=['example_id', 'string_label']) + label_file_contents = pd.DataFrame( + [ + ('a', 'no_damage', [0, 0]), + ('b', 'minor_damage', [1, 1]), + ('c', 'major_damage', [2, 2]), + ('c', 'no_damage', [2, 2]), + ('d', 'destroyed', [3, 3]), + ('d', 'bad_example', [3, 3]), + ], + columns=['example_id', 'string_label', 'coordinates'], + ) label_file_contents.to_csv(label_file_path, index=False) _, train_path = tempfile.mkstemp(dir=absltest.TEST_TMPDIR.value) @@ -117,31 +126,199 @@ def testCreateLabeledExamplesFromLabelFile(self): test_fraction=0.333, train_output_path=train_path, test_output_path=test_path, + connecting_distance_meters=78, use_multiprocessing=False) all_examples = _read_tfrecord(train_path) + _read_tfrecord(test_path) self.assertLen(all_examples, 6) - id_to_float_label = [ - ( - e.features.feature['example_id'].bytes_list.value[0].decode(), - e.features.feature['label'].float_list.value[0], - ) - for e in all_examples - ] + id_to_float_label = [] + for e in all_examples: + id_to_float_label.append(( + e.features.feature['example_id'].bytes_list.value[0].decode(), + e.features.feature['label'].float_list.value[0], + list(e.features.feature['coordinates'].float_list.value), + )) self.assertSameElements( id_to_float_label, [ - ('a', 0.0), - ('b', 0.0), - ('c', 1.0), - ('c', 0.0), - ('d', 0.0), - ('d', 1.0), + ('a', 0.0, [0, 0]), + ('b', 0.0, [1, 1]), + ('c', 1.0, [2, 2]), + ('c', 0.0, [2, 2]), + ('d', 0.0, [3, 3]), + ('d', 1.0, [3, 3]), + ], + ) + + def testCreateLabeledExamplesFromLabelFileWithNoOverlap(self): + n_test = 100 + for _ in range(n_test): + # Create unlabeled examples. + _, unlabeled_examples_path = tempfile.mkstemp( + dir=absltest.TEST_TMPDIR.value + ) + # a is connected to d + # c is connected to b + # e + test_fraction = 0.4 + possible_test_ids = [['b', 'c'], ['c', 'b'], ['a', 'd'], ['d', 'a']] + with tf.io.TFRecordWriter(unlabeled_examples_path) as writer: + example_id_lon_lat = [ + ('a', [92.850449, 20.148951]), + ('b', [92.889694, 20.157515]), + ('c', [92.889740, 20.157454]), + ('d', [92.850479, 20.148664]), + ('e', [92.898537, 20.160021]), + ] + for i, (example_id, (lon, lat)) in enumerate(example_id_lon_lat): + example = Example() + example.features.feature['example_id'].bytes_list.value.append( + example_id.encode() + ) + example.features.feature[ + 'encoded_coordinates' + ].bytes_list.value.append(str(i).encode()) + example.features.feature['coordinates'].float_list.value.extend( + [lon, lat] + ) + + writer.write(example.SerializeToString()) + + # Create a label file. + _, label_file_path = tempfile.mkstemp(dir=absltest.TEST_TMPDIR.value) + label_file_contents = pd.DataFrame( + [ + ('a', 'no_damage', [92.850449, 20.148951]), + ('b', 'minor_damage', [92.889694, 20.157515]), + ('c', 'major_damage', [92.889740, 20.157454]), + ('d', 'destroyed', [92.850479, 20.148664]), + ('e', 'bad_example', [92.898537, 20.160021]), + ], + columns=['example_id', 'string_label', 'coordinates'], + ) + label_file_contents.to_csv(label_file_path, index=False) + + _, train_path = tempfile.mkstemp(dir=absltest.TEST_TMPDIR.value) + _, test_path = tempfile.mkstemp(dir=absltest.TEST_TMPDIR.value) + cloud_labeling.create_labeled_examples( + project=None, + location=None, + dataset_ids=[], + label_file_paths=[label_file_path], + string_to_numeric_labels=[ + 'no_damage=0', + 'minor_damage=0', + 'major_damage=1', + 'destroyed=1', + 'bad_example=0', + ], + export_dir=None, + examples_pattern=unlabeled_examples_path, + test_fraction=test_fraction, + train_output_path=train_path, + test_output_path=test_path, + connecting_distance_meters=78.0, + use_multiprocessing=False, + ) + + all_examples = _read_tfrecord(train_path) + _read_tfrecord(test_path) + test_examples = _read_tfrecord(test_path) + test_ids = [ + e.features.feature['example_id'].bytes_list.value[0].decode() + for e in test_examples + ] + self.assertLen(all_examples, 5) + self.assertIn(test_ids, possible_test_ids) + + @parameterized.parameters( + dict( + graph=[ + [0, 1, 1, 0, 0, 0, 1], + [0, 0, 1, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 1, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + [0, 0, 0, 0, 0, 0, 0], + ], + correct_labels=[0, 0, 0, 1, 1, 2, 0], + ), + dict( + graph=[ + [0, 0, 0, 0], + [0, 0, 0, 0], + [0, 0, 0, 0], + [0, 0, 0, 0], + ], + correct_labels=[0, 1, 2, 3], + ), + dict( + graph=[ + [1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1], + [1, 1, 1, 1, 1, 1], + ], + correct_labels=[0, 0, 0, 0, 0, 0], + ), + ) + def testGetConnectedLabels(self, graph, correct_labels): + labels = cloud_labeling.get_connected_labels(np.array(graph)) + + self.assertSequenceEqual(labels, correct_labels) + + +class GetConnectionMatrixTest(tf.test.TestCase): + def testGetConnectionMatrix(self): + encoded_coordinate_lon_lat = [ + ('a', [92.850449, 20.148951]), + ('b', [92.889694, 20.157515]), + ('c', [92.889740, 20.157454]), + ('d', [92.850479, 20.148664]), + ('e', [92.898537, 20.160021]), + ] + connecting_distance_meters = 78.0 + encoded_coordinates = [] + longitudes = [] + latitudes = [] + for example in encoded_coordinate_lon_lat: + encoded_coordinates.append(example[0]) + longitudes.append(example[1][0]) + latitudes.append(example[1][1]) + + correct_gdf = gpd.GeoDataFrame( + data=encoded_coordinates, + geometry=[ + shapely.geometry.Point(484370.937, 2227971.391), + shapely.geometry.Point(488472.931, 2228915.897), + shapely.geometry.Point(488477.734, 2228909.144), + shapely.geometry.Point(484374.044, 2227939.628), + shapely.geometry.Point(489397.202, 2229192.628), + ], + columns=['encoded_coordinates'], + ) + correct_connection_matrix = np.array( + [ + [1, 0, 0, 1, 0], + [0, 1, 1, 0, 0], + [0, 1, 1, 0, 0], + [1, 0, 0, 1, 0], + [0, 0, 0, 0, 1], ], + dtype=np.int32, ) + gpd_df, connection_matrix = cloud_labeling.get_connection_matrix( + longitudes, latitudes, encoded_coordinates, connecting_distance_meters + ) + + self.assertNDArrayNear(connection_matrix, correct_connection_matrix, 1e-15) + self.assertSameElements(gpd_df, correct_gdf) + if __name__ == '__main__': absltest.main() diff --git a/src/skai/utils.py b/src/skai/utils.py index 9568ad7b..475b669e 100644 --- a/src/skai/utils.py +++ b/src/skai/utils.py @@ -16,6 +16,7 @@ import base64 import io +import math import struct from typing import Iterable, List, Sequence, Tuple @@ -125,3 +126,15 @@ def encode_coordinates(longitude: float, latitude: float) -> str: def decode_coordinates(encoded_coords: str) -> Tuple[float, float]: buffer = base64.b16decode(encoded_coords.encode('ascii')) return struct.unpack('= 0: + epsg_code = '326' + utm_band + return epsg_code + epsg_code = '327' + utm_band + return epsg_code diff --git a/src/skai/utils_test.py b/src/skai/utils_test.py new file mode 100644 index 00000000..f670b7cd --- /dev/null +++ b/src/skai/utils_test.py @@ -0,0 +1,55 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for utils.""" +from absl.testing import absltest +from absl.testing import parameterized +from skai import utils + + +class UtilsTest(parameterized.TestCase): + + @parameterized.parameters( + dict( + longitude=38.676355712551015, + latitude=58.92948405901603, + correct_utm='32637', + ), + dict( + longitude=74.38574398625497, + latitude=6.082091014059927, + correct_utm='32643', + ), + dict( + longitude=19.09160680029879, + latitude=30.920059879467274, + correct_utm='32634', + ), + dict( + longitude=-16.321149967123773, + latitude=62.08112825201508, + correct_utm='32628', + ), + dict( + longitude=149.78080000677284, + latitude=25.31143284356746, + correct_utm='32655', + ) + ) + def testConvertWGStoUTM(self, longitude, latitude, correct_utm): + self.assertEqual(utils.convert_wgs_to_utm(longitude, latitude), correct_utm) + + +if __name__ == '__main__': + absltest.main()