From 81c1bbb34a7d521d137b02ead558bbea0c9e419d Mon Sep 17 00:00:00 2001 From: Anand Date: Mon, 31 Jul 2023 17:47:54 +0530 Subject: [PATCH 1/3] ref issue - 119 - RSI indicator --- finquant/momentum_indicators.py | 82 +++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 finquant/momentum_indicators.py diff --git a/finquant/momentum_indicators.py b/finquant/momentum_indicators.py new file mode 100644 index 00000000..ded7a624 --- /dev/null +++ b/finquant/momentum_indicators.py @@ -0,0 +1,82 @@ +""" This module provides function(s) to compute momentum indicators +used in technical analysis such as RSI """ + +import matplotlib.pyplot as plt +import pandas as pd + +def relative_strength_index(data, window_length: int = 14, oversold: int = 30, + overbought: int = 70, standalone: bool = False) -> None: + """ Computes and visualizes a RSI graph, + plotted along with the prices in another sub-graph + for comparison. + + Ref: https://www.investopedia.com/terms/r/rsi.asp + + :Input + :data: pandas.Series or pandas.DataFrame with stock prices in columns + :window_length: Window length to compute RSI, default being 14 days + :oversold: Standard level for oversold RSI, default being 30 + :overbought: Standard level for overbought RSI, default being 70 + :standalone: Plot only the RSI graph + """ + if not isinstance(data, (pd.Series, pd.DataFrame)): + raise ValueError( + "data is expected to be of type pandas.Series or pandas.DataFrame" + ) + if isinstance(data, pd.DataFrame) and not len(data.columns.values) == 1: + raise ValueError("data is expected to have only one column.") + # checking integer fields + for field in (window_length, oversold, overbought): + if not isinstance(field, int): + raise ValueError(f"{field} must be an integer.") + # validating levels + if oversold >= overbought: + raise ValueError("oversold level should be < overbought level") + if oversold >= 100 or overbought >= 100: + raise ValueError("levels should be < 100") + # converting data to pd.DataFrame if it is a pd.Series (for subsequent function calls): + if isinstance(data, pd.Series): + data = data.to_frame() + # get the stock key + stock = data.keys()[0] + # calculate price differences + data['diff'] = data.diff(1) + # calculate gains and losses + data['gain'] = data['diff'].clip(lower = 0).round(2) + data['loss'] = data['diff'].clip(upper = 0).abs().round(2) + # placeholder + wl = window_length + # calculate rolling window mean gains and losses + data['avg_gain'] = data['gain'].rolling(window = wl, min_periods = wl).mean() + data['avg_loss'] = data['loss'].rolling(window = wl, min_periods = wl).mean() + # calculate WMS (wilder smoothing method) averages + for i, row in enumerate(data['avg_gain'].iloc[wl+1:]): + data['avg_gain'].iloc[i+wl+1] = (data['avg_gain'].iloc[i+wl]*(wl-1) +data['gain'].iloc[i+wl+1])/wl + for i, row in enumerate(data['avg_loss'].iloc[wl+1:]): + data['avg_loss'].iloc[i+wl+1] =(data['avg_loss'].iloc[i+wl]*(wl-1) + data['loss'].iloc[i+wl+1])/wl + # calculate RS values + data['rs'] = data['avg_gain']/data['avg_loss'] + # calculate RSI + data['rsi'] = 100 - (100/(1.0 + data['rs'])) + # Plot it + if standalone: + # Single plot + fig = plt.figure() + ax = fig.add_subplot(111) + ax.axhline(y = oversold, color = 'g', linestyle = '--') + ax.axhline(y = overbought, color = 'r', linestyle ='--') + data['rsi'].plot(ylabel = 'RSI', xlabel = 'Date', ax = ax, grid = True) + plt.title("RSI Plot") + plt.legend() + else: + # RSI against price in 2 plots + fig, ax = plt.subplots(2, 1, sharex=True, sharey=False) + ax[0].axhline(y = oversold, color = 'g', linestyle = '--') + ax[0].axhline(y = overbought, color = 'r', linestyle ='--') + ax[0].set_title('RSI + Price Plot') + # plot 2 graphs in 2 colors + colors = plt.rcParams["axes.prop_cycle"]() + data['rsi'].plot(ylabel = 'RSI', ax = ax[0], grid = True, color = next(colors)["color"], legend=True) + data[stock].plot(xlabel = 'Date', ylabel = 'Price', ax = ax[1], grid = True, + color = next(colors)["color"], legend = True) + plt.legend() From 12a8775fea68778e17983e025dc92dca600d1ff4 Mon Sep 17 00:00:00 2001 From: Anand Date: Tue, 1 Aug 2023 23:59:39 +0530 Subject: [PATCH 2/3] ref - issue #119 - Implementation of MACD momentum indicator --- finquant/momentum_indicators.py | 82 +++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/finquant/momentum_indicators.py b/finquant/momentum_indicators.py index ded7a624..ac1939b2 100644 --- a/finquant/momentum_indicators.py +++ b/finquant/momentum_indicators.py @@ -80,3 +80,85 @@ def relative_strength_index(data, window_length: int = 14, oversold: int = 30, data[stock].plot(xlabel = 'Date', ylabel = 'Price', ax = ax[1], grid = True, color = next(colors)["color"], legend = True) plt.legend() + +def macd(data, longer_ema_window: int = 26, shorter_ema_window: int = 12, + signal_ema_window: int = 9, standalone: bool = False) -> None: + """ + Computes and visualizes a MACD (Moving Average Convergence Divergence) + plotted along with price chart in another sub-graph for comparison. + + Ref: https://www.alpharithms.com/calculate-macd-python-272222/ + + :Input + :data: pandas.Series or pandas.DataFrame with stock prices in columns + :longer_ema_window: Window length (in days) for the longer EMA + :shorter_ema_window: Window length (in days) for the shorter EMA + :signal_ema_window: Window length (in days) for the signal + :standalone: If true, plot only the MACD signal + """ + + if not isinstance(data, (pd.Series, pd.DataFrame)): + raise ValueError( + "data is expected to be of type pandas.Series or pandas.DataFrame" + ) + if isinstance(data, pd.DataFrame) and not len(data.columns.values) == 1: + raise ValueError("data is expected to have only one column.") + # checking integer fields + for field in (longer_ema_window, shorter_ema_window, signal_ema_window): + if not isinstance(field, int): + raise ValueError(f"{field} must be an integer.") + # validating windows + if longer_ema_window < shorter_ema_window: + raise ValueError("longer ema window should be > shorter ema window") + if longer_ema_window < signal_ema_window: + raise ValueError("longer ema window should be > signal ema window") + + # converting data to pd.DataFrame if it is a pd.Series (for subsequent function calls): + if isinstance(data, pd.Series): + data = data.to_frame() + # get the stock key + stock = data.keys()[0] + # calculate EMA short period + ema_short = data.ewm(span=shorter_ema_window, adjust=False, min_periods=shorter_ema_window).mean() + # calculate EMA long period + ema_long = data.ewm(span=longer_ema_window, adjust=False, min_periods=longer_ema_window).mean() + # Subtract the longwer window EMA from the shorter window EMA to get the MACD + data['macd'] = ema_long - ema_short + # Get the signal window MACD for the Trigger line + data['macd_s'] = data['macd'].ewm(span=signal_ema_window, adjust=False, min_periods=signal_ema_window).mean() + # Calculate the difference between the MACD - Trigger for the Convergence/Divergence value + data['diff'] = data['macd'] - data['macd_s'] + hist = data['diff'] + + # Plot it + if standalone: + fig=plt.figure() + ax = fig.add_subplot(111) + data['macd'].plot(ylabel = 'MACD', xlabel='Date', ax = ax, grid = True, label='MACD', color='green', + linewidth=1.5, legend=True) + hist.plot(ax = ax, grid = True, label='diff', color='black', linewidth=0.5, legend=True) + data['macd_s'].plot(ax = ax, grid = True, label='SIGNAL', color='red', linewidth=1.5, legend=True) + + for i in range(len(hist)): + if hist[i] < 0: + ax.bar(data.index[i], hist[i], color = 'orange') + else: + ax.bar(data.index[i], hist[i], color = 'black') + else: + # RSI against price in 2 plots + fig, ax = plt.subplots(2, 1, sharex=True, sharey=False) + ax[0].set_title('MACD + Price Plot') + data['macd'].plot(ylabel = 'MACD', xlabel='Date', ax = ax[0], grid = True, + label='MACD', color='green', linewidth=1.5, legend=True) + hist.plot(ax = ax[0], grid = True, label='diff', color='black', linewidth=0.5, legend=True) + data['macd_s'].plot(ax = ax[0], grid = True, label='SIGNAL', color='red', linewidth=1.5, legend=True) + + for i in range(len(hist)): + if hist[i] < 0: + ax[0].bar(data.index[i], hist[i], color = 'orange') + else: + ax[0].bar(data.index[i], hist[i], color = 'black') + + data[stock].plot(xlabel = 'Date', ylabel = 'Price', ax = ax[1], grid = True, + color = 'orange', legend = True) + plt.legend() From 35846c8014f32238ca1402fdc723991b0a5c1ebf Mon Sep 17 00:00:00 2001 From: Anand Date: Sun, 6 Aug 2023 18:48:14 +0530 Subject: [PATCH 3/3] ref issue #119 - updated docs and added unit test --- example/Example-Analysis.py | 43 ++++++++++++ tests/test_momentum_indicators.py | 110 ++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 tests/test_momentum_indicators.py diff --git a/example/Example-Analysis.py b/example/Example-Analysis.py index ba0f4682..b9fa5ef3 100644 --- a/example/Example-Analysis.py +++ b/example/Example-Analysis.py @@ -309,3 +309,46 @@ # print(pf.data.loc[pf.data.index.year == 2017].head(3)) + +# + +# ## Momentum Indicators +# `FinQuant` provides a module `finquant.momentum_indicators` to compute and +# visualize a number of momentum indicators. Currently RSI (Relative Strength Index) +# and MACD (Moving Average Convergence Divergence) indicators are available. +# See below. + +# +# plot the RSI (Relative Strength Index) for disney stock proces +from finquant.momentum_indicators import relative_strength_index as rsi + +# get stock data for disney +dis = pf.get_stock("WIKI/DIS").data.copy(deep=True) + +# plot RSI - by default this plots RSI against the price in two graphs +rsi(dis) +plt.show() + +# plot RSI with custom arguments +rsi(dis, oversold = 20, overbought = 80) +plt.show() + +# plot RSI standalone graph +rsi(dis, oversold = 20, overbought = 80, standalone=True) +plt.show() + +# +# plot MACD for disney stock proces +from finquant.momentum_indicators import macd + +# plot MACD - by default this plots RSI against the price in two graphs +macd(dis) +plt.show() + +# plot MACD using custom arguments +macd(dis, longer_ema_window = 30, shorter_ema_window = 15, signal_ema_window = 10) +plt.show() + +# plot MACD standalone graph +macd(standlone = True) +plt.show() diff --git a/tests/test_momentum_indicators.py b/tests/test_momentum_indicators.py new file mode 100644 index 00000000..f3b091f6 --- /dev/null +++ b/tests/test_momentum_indicators.py @@ -0,0 +1,110 @@ +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd + +from finquant.momentum_indicators import ( + relative_strength_index as rsi, + macd, +) + +plt.switch_backend("Agg") + +def test_rsi(): + x = np.sin(np.linspace(1, 10, 100)) + xlabel_orig = "Date" + ylabel_orig = "Price" + df = pd.DataFrame({"Stock": x}, index=np.linspace(1, 10, 100)) + df.index.name = "Date" + rsi(df) + # get data from axis object + ax = plt.gca() + # ax.lines[0] is the data we passed to plot_bollinger_band + line1 = ax.lines[0] + stock_plot = line1.get_xydata() + xlabel_plot = ax.get_xlabel() + ylabel_plot = ax.get_ylabel() + # tests + assert (df['Stock'].index.values == stock_plot[:, 0]).all() + assert (df["Stock"].values == stock_plot[:, 1]).all() + assert xlabel_orig == xlabel_plot + assert ylabel_orig == ylabel_plot + +def test_rsi_standalone(): + x = np.sin(np.linspace(1, 10, 100)) + xlabel_orig = "Date" + ylabel_orig = "RSI" + labels_orig = ['rsi'] + title_orig = 'RSI Plot' + df = pd.DataFrame({"Stock": x}, index=np.linspace(1, 10, 100)) + df.index.name = "Date" + rsi(df, standalone=True) + # get data from axis object + ax = plt.gca() + # ax.lines[2] is the RSI data + line1 = ax.lines[2] + rsi_plot = line1.get_xydata() + xlabel_plot = ax.get_xlabel() + ylabel_plot = ax.get_ylabel() + print (xlabel_plot, ylabel_plot) + # tests + assert (df['rsi'].index.values == rsi_plot[:, 0]).all() + # for comparing values, we need to remove nan + a, b = df['rsi'].values, rsi_plot[:, 1] + a, b = map(lambda x: x[~np.isnan(x)], (a, b)) + assert (a == b).all() + labels_plot = ax.get_legend_handles_labels()[1] + title_plot = ax.get_title() + assert labels_plot == labels_orig + assert xlabel_plot == xlabel_orig + assert ylabel_plot == ylabel_orig + assert title_plot == title_orig + +def test_macd(): + x = np.sin(np.linspace(1, 10, 100)) + xlabel_orig = "Date" + ylabel_orig = "Price" + df = pd.DataFrame({"Stock": x}, index=np.linspace(1, 10, 100)) + df.index.name = "Date" + macd(df) + # get data from axis object + ax = plt.gca() + # ax.lines[0] is the data we passed to plot_bollinger_band + line1 = ax.lines[0] + stock_plot = line1.get_xydata() + xlabel_plot = ax.get_xlabel() + ylabel_plot = ax.get_ylabel() + # tests + assert (df['Stock'].index.values == stock_plot[:, 0]).all() + assert (df["Stock"].values == stock_plot[:, 1]).all() + assert xlabel_orig == xlabel_plot + assert ylabel_orig == ylabel_plot + +def test_macd_standalone(): + labels_orig = ['MACD', 'diff', 'SIGNAL'] + x = np.sin(np.linspace(1, 10, 100)) + xlabel_orig = "Date" + ylabel_orig = "MACD" + df = pd.DataFrame({"Stock": x}, index=np.linspace(1, 10, 100)) + df.index.name = "Date" + macd(df, standalone=True) + # get data from axis object + ax = plt.gca() + labels_plot = ax.get_legend_handles_labels()[1] + xlabel_plot = ax.get_xlabel() + ylabel_plot = ax.get_ylabel() + assert labels_plot == labels_orig + assert xlabel_plot == xlabel_orig + assert ylabel_plot == ylabel_orig + # ax.lines[0] is macd data + # ax.lines[1] is diff data + # ax.lines[2] is macd_s data + # tests + for i, key in ((0, 'macd'), (1, 'diff'), (2, 'macd_s')): + line = ax.lines[i] + data_plot = line.get_xydata() + # tests + assert (df[key].index.values == data_plot[:, 0]).all() + # for comparing values, we need to remove nan + a, b = df[key].values, data_plot[:, 1] + a, b = map(lambda x: x[~np.isnan(x)], (a, b)) + assert (a == b).all()