forked from TheAlgorithms/Python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
horn_schunck.py
131 lines (103 loc) · 4.19 KB
/
horn_schunck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
"""
The Horn-Schunck method estimates the optical flow for every single pixel of
a sequence of images.
It works by assuming brightness constancy between two consecutive frames
and smoothness in the optical flow.
Useful resources:
Wikipedia: https://en.wikipedia.org/wiki/Horn%E2%80%93Schunck_method
Paper: http://image.diku.dk/imagecanon/material/HornSchunckOptical_Flow.pdf
"""
from typing import SupportsIndex
import numpy as np
from scipy.ndimage import convolve
def warp(
image: np.ndarray, horizontal_flow: np.ndarray, vertical_flow: np.ndarray
) -> np.ndarray:
"""
Warps the pixels of an image into a new image using the horizontal and vertical
flows.
Pixels that are warped from an invalid location are set to 0.
Parameters:
image: Grayscale image
horizontal_flow: Horizontal flow
vertical_flow: Vertical flow
Returns: Warped image
>>> warp(np.array([[0, 1, 2], [0, 3, 0], [2, 2, 2]]), \
np.array([[0, 1, -1], [-1, 0, 0], [1, 1, 1]]), \
np.array([[0, 0, 0], [0, 1, 0], [0, 0, 1]]))
array([[0, 0, 0],
[3, 1, 0],
[0, 2, 3]])
"""
flow = np.stack((horizontal_flow, vertical_flow), 2)
# Create a grid of all pixel coordinates and subtract the flow to get the
# target pixels coordinates
grid = np.stack(
np.meshgrid(np.arange(0, image.shape[1]), np.arange(0, image.shape[0])), 2
)
grid = np.round(grid - flow).astype(np.int32)
# Find the locations outside of the original image
invalid = (grid < 0) | (grid >= np.array([image.shape[1], image.shape[0]]))
grid[invalid] = 0
warped = image[grid[:, :, 1], grid[:, :, 0]]
# Set pixels at invalid locations to 0
warped[invalid[:, :, 0] | invalid[:, :, 1]] = 0
return warped
def horn_schunck(
image0: np.ndarray,
image1: np.ndarray,
num_iter: SupportsIndex,
alpha: float | None = None,
) -> tuple[np.ndarray, np.ndarray]:
"""
This function performs the Horn-Schunck algorithm and returns the estimated
optical flow. It is assumed that the input images are grayscale and
normalized to be in [0, 1].
Parameters:
image0: First image of the sequence
image1: Second image of the sequence
alpha: Regularization constant
num_iter: Number of iterations performed
Returns: estimated horizontal & vertical flow
>>> np.round(horn_schunck(np.array([[0, 0, 2], [0, 0, 2]]), \
np.array([[0, 2, 0], [0, 2, 0]]), alpha=0.1, num_iter=110)).\
astype(np.int32)
array([[[ 0, -1, -1],
[ 0, -1, -1]],
<BLANKLINE>
[[ 0, 0, 0],
[ 0, 0, 0]]], dtype=int32)
"""
if alpha is None:
alpha = 0.1
# Initialize flow
horizontal_flow = np.zeros_like(image0)
vertical_flow = np.zeros_like(image0)
# Prepare kernels for the calculation of the derivatives and the average velocity
kernel_x = np.array([[-1, 1], [-1, 1]]) * 0.25
kernel_y = np.array([[-1, -1], [1, 1]]) * 0.25
kernel_t = np.array([[1, 1], [1, 1]]) * 0.25
kernel_laplacian = np.array(
[[1 / 12, 1 / 6, 1 / 12], [1 / 6, 0, 1 / 6], [1 / 12, 1 / 6, 1 / 12]]
)
# Iteratively refine the flow
for _ in range(num_iter):
warped_image = warp(image0, horizontal_flow, vertical_flow)
derivative_x = convolve(warped_image, kernel_x) + convolve(image1, kernel_x)
derivative_y = convolve(warped_image, kernel_y) + convolve(image1, kernel_y)
derivative_t = convolve(warped_image, kernel_t) + convolve(image1, -kernel_t)
avg_horizontal_velocity = convolve(horizontal_flow, kernel_laplacian)
avg_vertical_velocity = convolve(vertical_flow, kernel_laplacian)
# This updates the flow as proposed in the paper (Step 12)
update = (
derivative_x * avg_horizontal_velocity
+ derivative_y * avg_vertical_velocity
+ derivative_t
)
update = update / (alpha**2 + derivative_x**2 + derivative_y**2)
horizontal_flow = avg_horizontal_velocity - derivative_x * update
vertical_flow = avg_vertical_velocity - derivative_y * update
return horizontal_flow, vertical_flow
if __name__ == "__main__":
import doctest
doctest.testmod()