Source code for macrosynergy.panel.converge_row
"""Convergence of rows for max weight"""
import numpy as np
[docs]class ConvergeRow(object):
"""
Class designed to receive a row of weights, where at least one weight in the
aforementioned row exceeds the permitted upper-bound, and subsequently redistributes
the excess evenly across all cross-sections.
Parameters
----------
row : ~numpy.ndarray
Array of weights.
max_weight : float
Maximum weight.
margin : float
Margin of error allowed in the convergence to within the upper-bound,
"max_weight".
max_loops : int
Controls the accuracy: in theory, the greater the number of loops allowed, the
more accurate the convergence. However, will only become significant if a tight
margin is imposed: the "looser" the margin, the less likely the maximum number of
loops permitted will be exceeded.
"""
def __init__(
self,
row: np.ndarray,
max_weight: float,
margin: float = 0.001,
max_loops: int = 25,
):
self.row = row
self.max_weight = max_weight
self.flag = (1 / np.sum(self.row > 0)) <= self.max_weight
self.margin = margin
self.max_loops = max_loops
[docs] @classmethod
def application(cls, row: np.ndarray, max_weight: float):
"""
Initiates the class and applies the redistribution of weights.
Parameters
----------
row : ~numpy.ndarray
Array of weights.
max_weight : float
Maximum weight.
Returns
-------
~numpy.ndarray
Array of weights.
"""
cr = ConvergeRow(row=row, max_weight=max_weight)
if cr.flag: # if enough non-nan values in row
cr.distribute_simple()
else: # apply equal weights to all non-nan cases
cr.row = (cr.row > 0) / np.sum(cr.row > 0)
cr.row[cr.row == 0.0] = np.nan
return cr.row
[docs] def distribute_simple(self):
"""
Initiates an indefinite While Loop until the weights converge below the
(max_weight + margin). Will evenly redistribute the excess weight across all
active cross-sections, and will compute the maximum weight, the number of cross-
sections above the threshold and the excess weight dynamically: through each
iteration.
"""
count = 0
close_enough = False
while (count <= self.max_loops) and (not close_enough):
count += 1
excesses = self.row - (self.max_weight + self.margin)
excesses[excesses <= 0] = 0
self.row = (self.row - excesses) + np.nanmean(excesses)
if np.max(self.row) <= (self.max_weight + self.margin):
close_enough = True