-
Notifications
You must be signed in to change notification settings - Fork 248
/
Copy pathtest_convergence_accelerators.py
143 lines (102 loc) · 6.59 KB
/
test_convergence_accelerators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import KratosMultiphysics as KM
import KratosMultiphysics.KratosUnittest as KratosUnittest
from KratosMultiphysics.CoSimulationApplication.coupling_interface_data import CouplingInterfaceData
from KratosMultiphysics.CoSimulationApplication.convergence_accelerators.convergence_accelerator_wrapper import ConvergenceAcceleratorWrapper
from testing_utilities import DummySolverWrapper
from unittest.mock import Mock, patch
import numpy as np
from random import uniform
if KM.IsDistributedRun():
import KratosMultiphysics.mpi as KratosMPI
class TestConvergenceAcceleratorWrapper(KratosUnittest.TestCase):
def setUp(self):
self.model = KM.Model()
self.model_part = self.model.CreateModelPart("default")
self.model_part.AddNodalSolutionStepVariable(KM.PRESSURE)
self.model_part.AddNodalSolutionStepVariable(KM.PARTITION_INDEX)
self.dimension = 3
self.model_part.ProcessInfo[KM.DOMAIN_SIZE] = self.dimension
self.my_pid = KM.Testing.GetDefaultDataCommunicator().Rank()
self.num_nodes = self.my_pid % 5 + 3 # num_nodes in range (3 ... 7)
if self.my_pid == 4:
self.num_nodes = 0 # in order to emulate one partition not having local nodes
for i in range(self.num_nodes):
node = self.model_part.CreateNewNode(i, 0.1*i, 0.0, 0.0) # this creates the same coords in different ranks, which does not matter for this test
node.SetSolutionStepValue(KM.PARTITION_INDEX, self.my_pid)
node.SetSolutionStepValue(KM.PRESSURE, uniform(-10, 50))
if KM.IsDistributedRun():
KratosMPI.ParallelFillCommunicator(self.model_part, KM.Testing.GetDefaultDataCommunicator()).Execute()
data_settings = KM.Parameters("""{
"model_part_name" : "default",
"variable_name" : "PRESSURE"
}""")
self.interface_data = CouplingInterfaceData(data_settings, self.model)
self.dummy_solver_wrapper = DummySolverWrapper({"data_4_testing" : self.interface_data})
def test_accelerator_without_support_for_distributed_data(self):
conv_acc_settings = KM.Parameters("""{
"type" : "patched_mock_testing",
"data_name" : "data_4_testing"
}""")
exp_inp = self.interface_data.GetData()
update_solution_return_value = [uniform(-10, 50) for _ in range(self.num_nodes)]
global_update_solution_return_value = np.array(np.concatenate(KM.Testing.GetDefaultDataCommunicator().GathervDoubles(update_solution_return_value, 0)))
conv_acc_mock = Mock()
attrs = {
'SupportsDistributedData.return_value': False,
'UpdateSolution.return_value' : global_update_solution_return_value
}
conv_acc_mock.configure_mock(**attrs)
with patch('KratosMultiphysics.CoSimulationApplication.convergence_accelerators.convergence_accelerator_wrapper.CreateConvergenceAccelerator') as p:
p.return_value = conv_acc_mock
conv_acc_wrapper = ConvergenceAcceleratorWrapper(conv_acc_settings, self.dummy_solver_wrapper.interface_data_dict, KM.Testing.GetDefaultDataCommunicator())
conv_acc_wrapper.InitializeSolutionStep()
self.assertEqual(conv_acc_mock.SupportsDistributedData.call_count, 1)
self.assertEqual(conv_acc_wrapper.gather_scatter_required, self.interface_data.IsDistributed()) # gather-scatter is only required in case of distributed data
self.assertEqual(conv_acc_wrapper.executing_rank, self.my_pid == 0)
conv_acc_wrapper.InitializeNonLinearIteration()
# setting new solution for computing the residual
rand_data = [uniform(-10, 50) for _ in range(self.num_nodes)]
self.interface_data.SetData(rand_data)
exp_res = rand_data - exp_inp
conv_acc_wrapper.ComputeAndApplyUpdate()
self.assertEqual(conv_acc_mock.UpdateSolution.call_count, int(self.my_pid == 0)) # only one rank calls "UpdateSolution"
global_exp_res = np.array(np.concatenate(KM.Testing.GetDefaultDataCommunicator().GathervDoubles(exp_res, 0)))
global_exp_inp = np.array(np.concatenate(KM.Testing.GetDefaultDataCommunicator().GathervDoubles(exp_inp, 0)))
if self.my_pid == 0:
# numpy arrays cannot be compared using the mock-functions, hence using the numpy functions
np.testing.assert_array_equal(global_exp_res, conv_acc_mock.UpdateSolution.call_args[0][0])
np.testing.assert_array_equal(global_exp_inp, conv_acc_mock.UpdateSolution.call_args[0][1])
np.testing.assert_array_equal(exp_inp + update_solution_return_value, self.interface_data.GetData())
def test_accelerator_with_support_for_distributed_data(self):
conv_acc_settings = KM.Parameters("""{
"type" : "patched_mock_testing",
"data_name" : "data_4_testing"
}""")
update_solution_return_value = [uniform(-10, 50) for _ in range(self.num_nodes)]
conv_acc_mock = Mock()
attrs = {
'SupportsDistributedData.return_value': True,
'UpdateSolution.return_value' : update_solution_return_value
}
conv_acc_mock.configure_mock(**attrs)
with patch('KratosMultiphysics.CoSimulationApplication.convergence_accelerators.convergence_accelerator_wrapper.CreateConvergenceAccelerator') as p:
p.return_value = conv_acc_mock
conv_acc_wrapper = ConvergenceAcceleratorWrapper(conv_acc_settings, self.dummy_solver_wrapper.interface_data_dict, KM.Testing.GetDefaultDataCommunicator())
exp_inp = self.interface_data.GetData()
self.assertEqual(conv_acc_mock.SupportsDistributedData.call_count, 1)
self.assertFalse(conv_acc_wrapper.gather_scatter_required)
self.assertTrue(conv_acc_wrapper.executing_rank)
conv_acc_wrapper.InitializeSolutionStep()
conv_acc_wrapper.InitializeNonLinearIteration()
# setting new solution for computing the residual
rand_data = [uniform(-10, 50) for _ in range(self.num_nodes)]
self.interface_data.SetData(rand_data)
exp_res = rand_data - exp_inp
conv_acc_wrapper.ComputeAndApplyUpdate()
self.assertEqual(conv_acc_mock.UpdateSolution.call_count, 1)
# numpy arrays cannot be compared using the mock-functions, hence using the numpy functions
np.testing.assert_array_equal(exp_res, conv_acc_mock.UpdateSolution.call_args[0][0])
np.testing.assert_array_equal(exp_inp, conv_acc_mock.UpdateSolution.call_args[0][1])
np.testing.assert_array_equal(exp_inp + update_solution_return_value, self.interface_data.GetData())
if __name__ == '__main__':
KratosUnittest.main()