__version__ = '1.0.0'
import csv
import datetime
import importlib
from abc import abstractmethod
from collections import defaultdict
from typing import Dict, List, Set
import numpy as np
import pandas as pd
from dateutil import relativedelta as rdelta
import logging
from functools import partial
import calendar
from openpyxl import Workbook
from scipy.interpolate import interp1d
import json
from typing import Callable
import pint
import pint_pandas
__author__ = 'schien'
# import pkg_resources # part of setuptools
# version = pkg_resources.require("excel-modelling-helper")[0].version
param_name_map_v1 = {'variable': 'name', 'scenario': 'source_scenarios_string', 'module': 'module_name',
'distribution': 'distribution_name', 'param 1': 'param_a', 'param 2': 'param_b',
'param 3': 'param_c',
'unit': '', 'CAGR': 'cagr', 'ref date': 'ref_date', 'label': '', 'tags': '', 'comment': '',
'source': ''}
param_name_map_v2 = {'CAGR': 'cagr',
'comment': '',
'label': '',
'mean growth': 'growth_factor',
'param': '',
'ref date': 'ref_date',
'ref value': '',
'scenario': 'source_scenarios_string',
'source': '',
'tags': '',
'type': '',
'unit': '',
'variability growth': 'ef_growth_factor',
'initial_value_proportional_variation': '',
'variable': 'name'}
param_name_maps = {1: param_name_map_v1, 2: param_name_map_v2}
# logger.basicConfig(level=logger.DEBUG)
logger = logging.getLogger(__name__)
class DistributionFunctionGenerator(object):
module: str
distribution: str
param_a: str
param_b: str
param_c: str
def __init__(self, module_name=None, distribution_name=None, param_a: float = None,
param_b: float = None, param_c: float = None, size=None, **kwargs):
"""
Instantiate a new object.
:param module_name:
:param distribution_name:
:param param_a:
:param param_b:
:param param_c:
:param size:
:param kwargs: can contain key "sample_mean_value" with bool value
"""
self.kwargs = kwargs
self.size = size
self.module_name = module_name
self.distribution_name = distribution_name
self.sample_mean_value = kwargs.get('sample_mean_value', False)
# prepare function arguments
if distribution_name == 'choice':
if type(param_a) == str:
tokens = param_a.split(',')
params = [float(token.strip()) for token in tokens]
self.random_function_params = [np.array(params, dtype=np.float)]
else:
self.random_function_params = [np.array([i for i in [param_a, param_b, param_c] if i], dtype=np.float)]
logger.debug(f'setting function params for choice distribution {self.random_function_params}')
else:
self.random_function_params = [i for i in [param_a, param_b, param_c] if i not in [None, ""]]
def get_mean(self, distribution_function):
"""Get the mean value for a distribution.
If the distribution function is [normal, uniform,choice,triangular] the analytic value is being calculted.
Else, the distribution is instantiated and then the mean is being calculated.
:param distribution_function:
:return: the mean as a scalar
"""
name = self.distribution_name
params = self.random_function_params
if name == 'normal':
return params[0]
if name == 'uniform':
return (params[0] + params[1]) / 2.
if name == 'choice':
return params[0].mean()
if name == 'triangular':
return (params[0] + params[1] + params[2]) / 3.
return distribution_function().mean()
def generate_values(self, *args, **kwargs):
"""
Generate a sample of values by sampling from a distribution. The size of the sample can be overriden with the 'size' kwarg.
If `self.sample_mean_value == True` the sample will contain "size" times the mean value.
:param args:
:param kwargs:
:return: sample as vector of given size
# todo: this will break if it receives data from a spreadsheet with no module column (almost all of them)
"""
sample_size = kwargs.get('size', self.size)
f = self.instantiate_distribution_function(self.module_name, self.distribution_name)
distribution_function = partial(f, *self.random_function_params, size=sample_size)
if self.sample_mean_value:
sample = np.full(sample_size, self.get_mean(distribution_function))
else:
sample = distribution_function()
return sample
@staticmethod
def instantiate_distribution_function(module_name, distribution_name):
module = importlib.import_module(module_name)
func = getattr(module, distribution_name)
return func
[docs]class Parameter(object):
"""
A single parameter
"""
version: int
name: str
unit: str
comment: str
source: str
scenario: str
processes: Dict[str, List]
"optional comma-separated list of tags"
tags: str
def __init__(self, name, tags=None, source_scenarios_string: str = None, unit: str = None,
comment: str = None, source: str = None, version=None,
**kwargs):
self.version = version
# The source definition of scenarios. A comma-separated list
self.source = source
self.comment = comment
self.unit = unit
self.source_scenarios_string = source_scenarios_string
self.tags = tags
self.name = name
self.scenario = None
self.cache = None
# track the usages of this parameter per process as a list of
# process-specific variable names that are backed by this parameter
self.processes = defaultdict(list)
self.kwargs = kwargs
def __call__(self, settings=None, *args, **kwargs):
"""
Samples from a parameter. Values are cached and returns the same value every time called.
@todo confusing interface that accepts 'settings' and kwargs at the same time.
worse- 'use_time_series' must be present in the settings dict
settings, kwargs and args are also ignored after the first call because of caching
:param args:
:param kwargs: pass-through to generator
:return:
"""
if self.cache is None:
kwargs['name'] = self.name
kwargs['unit'] = self.unit
kwargs['tags'] = self.tags
kwargs['scenario'] = self.scenario
if not settings:
settings = {}
common_args = {
'size': settings.get('sample_size', 1),
'sample_mean_value': settings.get('sample_mean_value', False),
'with_pint_units': settings.get('with_pint_units', False)
}
common_args.update(**self.kwargs)
if settings.get('use_time_series', False):
if self.version == 2:
generator = GrowthTimeSeriesGenerator(**common_args, times=settings['times'])
else:
generator = ConstantUncertaintyExponentialGrowthTimeSeriesGenerator(**common_args,
times=settings['times'])
else:
# raise ValueError('\'use_time_series\' must be present in the settings dict')
generator = DistributionFunctionGenerator(**common_args)
# is this is a group variable?
# @todo refactor - use 'with_group' as a global switch and auto-lookup country variables as you go along
if settings.get('with_group'):
kwargs['with_group'] = settings['with_group'] and kwargs['name'] in settings.get('group_vars')
kwargs['group_flag'] = kwargs['name'] in settings['group_vars']
kwargs['groupings'] = settings['groupings'] if 'groupings' in settings else None
self.cache = generator.generate_values(*args, **kwargs)
return self.cache
def add_usage(self, process_name, variable_name):
# add the name of a variable of a process model that is backed by this parameter
self.processes[process_name].append(variable_name)
class GrowthTimeSeriesGenerator(DistributionFunctionGenerator):
ref_date: str
# of the mean values
# the type of growth ['exp']
# growth_function_type: str
# of the error function
variance: str
# error function growth rate
ef_growth_factor: str
def __init__(self, times=None, size=None, index_names=None, ref_date=None, with_pint_units=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ref_date = ref_date if ref_date else None
self.with_pint_units = with_pint_units
if self.with_pint_units:
import pint
self.times = times
self.size = size
iterables = [times, range(0, size)]
self._multi_index = pd.MultiIndex.from_product(iterables, names=index_names)
assert type(times.freq) == pd.tseries.offsets.MonthBegin, 'Time index must have monthly frequency'
def generate_sigmas(self, group=None):
if self.kwargs['type'] == 'interp':
def get_date(record):
return datetime.datetime.strptime(record[0], "%Y-%m-%d")
ref_value = self.kwargs['ref value'][group] if group else self.kwargs['ref value']
ref_value_ = sorted(json.loads(ref_value.strip()).items(), key=get_date)
intial_value = ref_value_[0][1]
else:
intial_value = float(self.kwargs['ref value'][group]) if group else float(self.kwargs['ref value'])
initial_value_proportional_variation = self.kwargs['initial_value_proportional_variation'][
group] if group else self.kwargs['initial_value_proportional_variation']
variability_ = intial_value * initial_value_proportional_variation
logger.debug(f'sampling random distribution with parameters -{variability_}, 0, {variability_}')
sigma = np.random.triangular(-1 * variability_, 0, variability_, (len(self.times), self.size))
return sigma
def generate_values(self, *args, **kwargs):
"""
Instantiate a random variable and apply annual growth factors.
:return:
todo: this is very long and opaque
"""
assert 'ref value' in self.kwargs
# 1. Generate $\mu$
start_date = self.times[0].to_pydatetime()
end_date = self.times[-1].to_pydatetime()
ref_date = self.ref_date
if not ref_date:
raise Exception(f"Ref date not set for variable {kwargs['name']}")
mu = {}
if kwargs.get('with_group'):
for c in kwargs['groupings']:
mu[c] = self.generate_mu(end_date, ref_date, start_date, group=c, **kwargs)
else:
mu = self.generate_mu(end_date, ref_date, start_date, **kwargs)
# 3. Generate $\sigma$
# Prepare array with growth values $\sigma$
if self.sample_mean_value:
sigma = np.zeros((len(self.times), self.size))
else:
if kwargs.get('with_group'):
sigma = {}
for c in kwargs['groupings']:
sigma[c] = self.generate_sigmas(group=c)
else:
sigma = self.generate_sigmas()
# logger.debug(ref_date.strftime("%b %d %Y"))
# 4. Prepare growth array for $\alpha_{sigma}$
if kwargs.get('with_group'):
alpha_sigma = {}
for c in kwargs['groupings']:
growth_factor = self.kwargs['ef_growth_factor'][c] if isinstance(self.kwargs['ef_growth_factor'],
dict) else self.kwargs['ef_growth_factor']
# growth_factor = self.kwargs['ef_growth_factor'][c]
alpha_sigma[c] = growth_coefficients(start_date,
end_date,
ref_date,
growth_factor, 1)
else:
alpha_sigma = growth_coefficients(start_date,
end_date,
ref_date,
self.kwargs['ef_growth_factor'], 1)
# 5. Prepare DataFrame
iterables = [self.times, range(self.size)]
index_names = ['time', 'samples']
_multi_index = pd.MultiIndex.from_product(iterables, names=index_names)
# logger.debug(start_date)
# logger.debug(end_date)
from dateutil import relativedelta
r = relativedelta.relativedelta(end_date, start_date)
months = r.years * 12 + r.months + 1
name = kwargs['name']
# Apply growth to $\sigma$ and add $\sigma$ to $\mu$
# logger.debug(sigma.size)
# logger.debug(alpha_sigma.shape)
# logger.debug(months)
if self.with_pint_units:
unit_ = kwargs["unit"]
if not unit_:
unit_ = 'dimensionless'
dtype = f'pint[{unit_}]'
else:
dtype = 'float64'
if kwargs.get('with_group'):
iterables = [self.times, range(self.size), kwargs['groupings']]
index_names = ['time', 'samples', 'group']
group_multi_index = pd.MultiIndex.from_product(iterables, names=index_names)
# Multiply each value of alpha_sigma by sigma.
if not self.sample_mean_value:
alpha_sigma.update((group, value * sigma[group]) for group, value in alpha_sigma.items())
else:
alpha_sigma.update((group, value * sigma) for group, value in alpha_sigma.items())
temp = {}
for group in kwargs['groupings']:
# Compute values from sigmas and mus.
temp[group] = [alpha_sigma[group][i] + mu[group][i] for i in range(len(self.times))]
data = []
# Rearrange data to match index order.
for i in range(len(self.times)):
for j in range(self.size):
for group in kwargs['groupings']:
data.append(temp[group][i][j])
series = pd.Series(data, index=group_multi_index, dtype=dtype)
else:
series = pd.Series(((sigma * alpha_sigma) + mu.reshape(months, 1)).ravel(), index=_multi_index,
dtype=dtype)
# test if df has sub-zero values
df_sigma__dropna = series[series <= 0]
if self.with_pint_units:
_values = df_sigma__dropna.pint.m
else:
_values = df_sigma__dropna
if not _values.empty:
logger.warning(f"Negative values for parameter {name} from {df_sigma__dropna.index[0][0]}")
return series
def generate_mu(self, end_date, ref_date, start_date, group=None, **kwargs):
if self.kwargs['type'] == 'exp':
ref_value = self.kwargs['ref value'][group] if group and isinstance(self.kwargs['ref value'], dict) else \
self.kwargs['ref value']
mu_bar = np.full(len(self.times), float(ref_value))
# 2. Apply Growth to Mean Values $\alpha_{mu}$
growth_factor = self.kwargs['growth_factor'][group] if group and isinstance(
self.kwargs['growth_factor'], dict) else self.kwargs['growth_factor']
alpha_mu = growth_coefficients(start_date,
end_date,
ref_date,
growth_factor, 1)
mu = mu_bar * alpha_mu.ravel()
mu = mu.reshape(len(self.times), 1)
return mu
elif self.kwargs['type'] == 'interp':
def toTimestamp(d):
return calendar.timegm(d.timetuple())
def interpolate(growth_config: Dict[str, float], date_range, kind='linear'):
arr1 = np.array([toTimestamp(datetime.datetime.strptime(date_val, '%Y-%m-%d')) for date_val in
growth_config.keys()])
arr2 = np.array([val for val in growth_config.values()])
f = interp1d(arr1, arr2, kind=kind, fill_value='extrapolate')
return f([toTimestamp(date_val) for date_val in date_range])
ref_value_ = json.loads(self.kwargs['ref value'][group].strip()) if group and isinstance(
self.kwargs['ref value'], dict) else json.loads(
self.kwargs['ref value'].strip())
return interpolate(ref_value_, self.times, self.kwargs['param'])
else:
raise Exception(f"no variable type set for variable {kwargs['name']}")
class ConstantUncertaintyExponentialGrowthTimeSeriesGenerator(DistributionFunctionGenerator):
cagr: str
ref_date: str
def __init__(self, cagr=None, times=None, size=None, index_names=None, ref_date=None, with_pint_units=False, *args,
**kwargs):
super().__init__(*args, **kwargs)
self.cagr = cagr if cagr else 0
self.ref_date = ref_date if ref_date else None
self.with_pint_units = with_pint_units
if self.with_pint_units:
import pint
self.times = times
self.size = size
iterables = [times, range(0, size)]
self._multi_index = pd.MultiIndex.from_product(iterables, names=index_names)
assert type(times.freq) == pd.tseries.offsets.MonthBegin, 'Time index must have monthly frequency'
def generate_values(self, *args, **kwargs):
"""
Instantiate a random variable and apply annual growth factors.
:return:
"""
values = super().generate_values(*args, **kwargs, size=(len(self.times) * self.size,))
alpha = self.cagr
# @todo - fill to cover the entire time: define rules for filling first
ref_date = self.ref_date if self.ref_date else self.times[0].to_pydatetime()
# assert ref_date >= self.times[0].to_pydatetime(), 'Ref date must be within variable time span.'
# assert ref_date <= self.times[-1].to_pydatetime(), 'Ref date must be within variable time span.'
start_date = self.times[0].to_pydatetime()
end_date = self.times[-1].to_pydatetime()
a = growth_coefficients(start_date, end_date, ref_date, alpha, self.size)
x = a.ravel()
values = np.multiply(values, x)
# df = pd.DataFrame(values)
# df.columns = [kwargs['name']]
# df.set_index(self._multi_index, inplace=True)
# # @todo this is a hack to return a series with index as I don't know how to set an index and rename a series
# data_series = df.iloc[:, 0]
# data_series._metadata = kwargs
# data_series.index.rename(['time', 'samples'], inplace=True)
#
if self.with_pint_units:
if not kwargs["unit"]:
dtype = 'pint[dimensionless]'
else:
dtype = f'pint[{kwargs["unit"]}]'
else:
dtype = 'float64'
series = pd.Series(values, index=self._multi_index, dtype=dtype)
return series
[docs]def growth_coefficients(start_date, end_date, ref_date, alpha, samples):
"""
Build a matrix of growth factors according to the CAGR formula y'=y0 (1+a)^(t'-t0).
a growth rate alpha
t0 start date
t' end date
y' output
y0 start value
"""
start_offset = 0
if ref_date < start_date:
offset_delta = rdelta.relativedelta(start_date, ref_date)
start_offset = offset_delta.months + 12 * offset_delta.years
start_date = ref_date
end_offset = 0
if ref_date > end_date:
offset_delta = rdelta.relativedelta(ref_date, end_date)
end_offset = offset_delta.months + 12 * offset_delta.years
end_date = ref_date
delta_ar = rdelta.relativedelta(ref_date, start_date)
ar = delta_ar.months + 12 * delta_ar.years
delta_br = rdelta.relativedelta(end_date, ref_date)
br = delta_br.months + 12 * delta_br.years
# we place the ref point on the lower interval (delta_ar + 1) but let it start from 0
# in turn we let the upper interval start from 1
g = np.fromfunction(lambda i, j: np.power(1 - alpha, np.abs(i) / 12), (ar + 1, samples), dtype=float)
h = np.fromfunction(lambda i, j: np.power(1 + alpha, np.abs(i + 1) / 12), (br, samples), dtype=float)
g = np.flipud(g)
# now join the two arrays
a = np.vstack((g, h))
if start_offset > 0:
a = a[start_offset:]
if end_offset > 0:
a = a[:-end_offset]
return a
[docs]class ParameterScenarioSet(object):
"""
The set of all version of a parameter for all the scenarios.
"""
default_scenario = 'default'
"the name of the parameters in this set"
parameter_name: str
scenarios: Dict[str, Parameter]
def __init__(self):
self.scenarios = {}
[docs] def add_scenario(self, parameter: 'Parameter', scenario_name: str = default_scenario):
"""
Add a scenario for this parameter.
:param scenario_name:
:param parameter:
:return:
"""
self.scenarios[scenario_name] = parameter
def __getitem__(self, item):
return self.scenarios.__getitem__(item)
def __setitem__(self, key, value):
return self.scenarios.__setitem__(key, value)
[docs]class ParameterRepository(object):
"""
Contains all known parameter definitions (so that it is not necessary to re-read the excel file for repeat param accesses).
The param definitions are independent from the sampling (the Param.__call__ method). Repeat access to __call__ will
create new samples.
Internally, parameters are organised together with all the scenario variants in a single ParameterScenarioSet.
"""
parameter_sets: Dict[str, ParameterScenarioSet]
tags: Dict[str, Dict[str, Set[Parameter]]]
def __init__(self):
self.parameter_sets = defaultdict(ParameterScenarioSet)
self.tags = defaultdict(lambda: defaultdict(set))
def add_all(self, parameters: List[Parameter]):
for p in parameters:
self.add_parameter(p)
def clear_cache(self):
for p_sets in self.parameter_sets.values():
for param_name, param in p_sets.scenarios.items():
param.cache = None
[docs] def add_parameter(self, parameter: Parameter):
"""
A parameter can have several scenarios. They are specified as a comma-separated list in a string.
:param parameter:
:return:
"""
# try reading the scenarios from the function arg or from the parameter attribute
scenario_string = parameter.source_scenarios_string
if scenario_string:
_scenarios = [i.strip() for i in scenario_string.split(',')]
self.fill_missing_attributes_from_default_parameter(parameter)
else:
_scenarios = [ParameterScenarioSet.default_scenario]
for scenario in _scenarios:
parameter.scenario = scenario
self.parameter_sets[parameter.name][scenario] = parameter
# record all tags for this parameter
if parameter.tags:
_tags = [i.strip() for i in parameter.tags.split(',')]
for tag in _tags:
self.tags[tag][parameter.name].add(parameter)
[docs] def fill_missing_attributes_from_default_parameter(self, param):
"""
Empty fields in Parameter definitions in scenarios are populated with default values.
E.g. in the example below, the source for the Power_TV variable in the 8K scenario would also be EnergyStar.
+----------+----------+-----+--------+------------+
| name | scenario | val | tags | source |
+----------+----------+-----+--------+------------+
| Power_TV | | 60 | UD, TV | EnergyStar |
| Power_TV | 8K | 85 | new_tag| |
+----------+----------+-----+--------+------------+
**Note** tags must not differ. In the example above, the 8K scenario variable the tags value would be overwritten
with the default value.
:param param:
:return:
"""
if not self.exists(param.name) or ParameterScenarioSet.default_scenario not in self.parameter_sets[
param.name].scenarios.keys():
logger.warning(
f'No default value for param {param.name} found.')
return
default = self.parameter_sets[param.name][ParameterScenarioSet.default_scenario]
for att_name, att_value in default.__dict__.items():
if att_name in ['unit', 'label', 'comment', 'source', 'tags']:
if att_name == 'tags' and default.tags != param.tags:
logger.warning(
f'For param {param.name} for scenarios {param.source_scenarios_string}, '
f'tags is different from default parameter tags. Overwriting with default values.')
setattr(param, att_name, att_value)
if not getattr(param, att_name):
logger.debug(
f'For param {param.name} for scenarios {param.source_scenarios_string}, '
f'populating attribute {att_name} with value {att_value} from default parameter.')
setattr(param, att_name, att_value)
def __getitem__(self, item) -> Parameter:
"""
Return the default scenario parameter for a given variable name
:param item: the name of the variable
:return:
"""
return self.get_parameter(item, scenario_name=ParameterScenarioSet.default_scenario)
def get_parameter(self, param_name, scenario_name=ParameterScenarioSet.default_scenario) -> Parameter:
if self.exists(param_name, scenario=scenario_name):
return self.parameter_sets[param_name][scenario_name]
try:
return self.parameter_sets[param_name][ParameterScenarioSet.default_scenario]
except KeyError:
raise KeyError(f"{param_name} not found")
[docs] def find_by_tag(self, tag) -> Dict[str, Set[Parameter]]:
"""
Get all registered dicts that are registered for a tag
:param tag: str - single tag
:return: a dict of {param name: set[Parameter]} that contains all ParameterScenarioSets for all parameter names with a given tag
"""
return self.tags[tag]
def exists(self, param, scenario=None) -> bool:
# if scenario is not None:
# return
present = param in self.parameter_sets.keys()
if not present:
return False
scenario = scenario if scenario else ParameterScenarioSet.default_scenario
return scenario in self.parameter_sets[param].scenarios.keys()
def list_scenarios(self, param):
if param in self.parameter_sets.keys():
return self.parameter_sets[param].scenarios.keys()