Commits (4)
...@@ -95,7 +95,119 @@ def histogram_from_series(pandas_series): ...@@ -95,7 +95,119 @@ def histogram_from_series(pandas_series):
# to 4 significant figures # to 4 significant figures
return list(zip(avg_bins, cts)) return list(zip(avg_bins, cts))
def resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging=False, drop_na=True): def resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging=False, drop_na=True, interp_method='pad'):
'''
Returns a new pandas dataframe that is resampled at the specified "averaging_hours"
interval. If the 'averaging_hours' parameter is fractional, the averaging time
period is truncated to the lesser minute.
If 'drop_na' is True, rows with any NaN values are dropped.
For some reason the pandas resampling sometimes fails if the datetime index is timezone aware...
'''
interval_lookup = {
0.5: {'rule':'30min', 'loffset': '15min'},
1: {'rule': '1H', 'loffset': '30min'},
2: {'rule': '2H', 'loffset': '1H'},
4: {'rule': '4H', 'loffset': '2H'},
8: {'rule': '8H', 'loffset': '4H'},
24: {'rule': '1D', 'loffset': '12H'},
168: {'rule': '1W', 'loffset': '108H'},
720: {'rule': '1M', 'loffset': '16D'},
8760: {'rule': 'AS', 'loffset': '6M'}
}
params = interval_lookup.get(averaging_hours, {'rule':str(int(averaging_hours * 60)) + 'min', 'loffset':str(int(averaging_hours * 30)) + 'min'})
if not use_rolling_averaging:
# apply averaging weighted by duration
dfResampled = weighted_resample_timeseries(pandas_dataframe,params['rule'],params['loffset'],interp_method)
else:
# apply rolling averaging
dfResampled = pandas_dataframe.sort_index().rolling(str(int(averaging_hours * 60)) + 'min').mean()
dfResampled = dfResampled[pandas_dataframe.index.min() + pd.tseries.frequencies.to_offset(params['rule']):]
# offset the index label to the center of each period
dfResampled.index = dfResampled.index - pd.tseries.frequencies.to_offset(params['loffset'])
if drop_na:
dfResampled = dfResampled.dropna()
return dfResampled
def weighted_resample_timeseries(pandas_dataframe, averaging, offset, interp_method='pad'):
'''
Returns a new pandas dataframe that is resampled at the specified
interval.
'''
# discard duplicate values in the index
pandas_dataframe = pandas_dataframe[~pandas_dataframe.index.duplicated()]
# insert new rows representing the breaks for each averaging period
window_breaks = pandas_dataframe.resample(averaging, label='left').asfreq().shift(freq=pd.tseries.frequencies.to_offset('-1N'))
window_breaks[:] = np.nan
# calculate the average number of hours in the resampling periods
averaging_hours = (window_breaks.index.to_series().diff() / pd.Timedelta(1,'hour')).mean()
interp_limit = int(24 / averaging_hours) + 1 # limit interpolation to 24 hours or 1 averaging period
# also create breaks that are shifted 1 day forward
window_breaks_shifted = window_breaks.shift(freq='1D')
window_breaks_shifted = window_breaks_shifted[window_breaks_shifted.index < pandas_dataframe.index.max()]
window_breaks = window_breaks.append(window_breaks_shifted[~window_breaks_shifted.index.isin(window_breaks.index)])
df = pandas_dataframe.append(window_breaks[~window_breaks.index.isin(pandas_dataframe.index)]).sort_index()
# interpolate values
df = df.interpolate(method=interp_method,limit=interp_limit)
# calculate the 'duration' weights for each row and for each time period (based on difference from previous timestamp)
value_duration = (df.index.to_series() - df.index.to_series().shift(1)) / pd.Timedelta(1,'hour')
value_duration = value_duration.clip(lower=None,upper=24) # maximum weight = 24 hours
# shift the dataframe index to place values at the end of each peiod
df = df.shift(1)
# multiply values by weights
df = df.multiply(value_duration,axis='index')
df['value_duration_weight'] = value_duration
# discard the last datapoint (because the weight is unknown)
df = df[:-1]
# resample and calculate the weighted average for each time period
dfResampled = df.resample(rule=averaging, closed='right', label='left').sum()
dfResampled = dfResampled[pandas_dataframe.columns].div(dfResampled['value_duration_weight'],axis='index')
if offset:
# offset the index label to the center of each period
dfResampled.index = dfResampled.index + pd.tseries.frequencies.to_offset(offset)
return dfResampled
def decimate_timeseries(df,bin_count=1000,col=None):
'''
Decimates a dataframe to limit the maximum number of datapoints for plotting
'''
if len(df) > bin_count * 2:
if col == None:
# default to using the first column
col = df.columns[0]
# bin the index values
bins = df.groupby(pd.cut(df.index,bins=bin_count,labels=np.arange(0,bin_count)).astype(int))
# keep the max and min value in each bin
maximums = df.loc[bins[col].idxmax()]
minimums = df.loc[bins[col].idxmin()]
return pd.concat([maximums,minimums]).drop_duplicates().sort_index()
else:
return df
def old_resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging=False, drop_na=True):
''' '''
Returns a new pandas dataframe that is resampled at the specified "averaging_hours" Returns a new pandas dataframe that is resampled at the specified "averaging_hours"
interval. If the 'averaging_hours' parameter is fractional, the averaging time interval. If the 'averaging_hours' parameter is fractional, the averaging time
...@@ -123,10 +235,11 @@ def resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging ...@@ -123,10 +235,11 @@ def resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging
else: else:
# resample to consistent interval # resample to consistent interval
original_interval = pandas_dataframe.index.to_series().diff().quantile(.05) original_interval = pandas_dataframe.index.to_series().diff().quantile(.05)
new_df = pandas_dataframe.resample(rule=original_interval).median() new_df = pandas_dataframe.resample(rule=original_interval).median().ffill(limit=1)
# apply the rolling averaging # apply the rolling averaging
new_df = new_df.rolling(int(pd.Timedelta(hours=averaging_hours) / original_interval),center=True,min_periods=1).mean() window_size = int(pd.Timedelta(hours=averaging_hours) / original_interval)
new_df = new_df.rolling(window_size,center=True,min_periods=int(window_size * 0.75) + 1).mean()
# downsample the result if there are more than 1000 values # downsample the result if there are more than 1000 values
if len(new_df) > 1000: if len(new_df) > 1000:
......
...@@ -36,8 +36,6 @@ class TimeSeries(basechart.BaseChart): ...@@ -36,8 +36,6 @@ class TimeSeries(basechart.BaseChart):
averaging_hours = 0 averaging_hours = 0
use_rolling_averaging = False use_rolling_averaging = False
# determine the start time for selecting records and loop through the selected # determine the start time for selecting records and loop through the selected
# records to get the needed dataset # records to get the needed dataset
st_ts, end_ts = self.get_ts_range() st_ts, end_ts = self.get_ts_range()
...@@ -58,7 +56,15 @@ class TimeSeries(basechart.BaseChart): ...@@ -58,7 +56,15 @@ class TimeSeries(basechart.BaseChart):
if not df.empty: if not df.empty:
# perform average (if requested) # perform average (if requested)
if averaging_hours: if averaging_hours:
df = bmsapp.data_util.resample_timeseries(df,averaging_hours,use_rolling_averaging) if sensor.unit.measure_type == 'state':
# if the sensor has defined states
interp_method = 'pad'
else:
interp_method = 'linear'
df = bmsapp.data_util.resample_timeseries(df,averaging_hours,use_rolling_averaging,interp_method=interp_method)
# limit the number of points to plot
df = bmsapp.data_util.decimate_timeseries(df, bin_count=1000,col='val')
# create lists for plotly # create lists for plotly
if np.absolute(df.val.values).max() < 10000: if np.absolute(df.val.values).max() < 10000:
...@@ -81,7 +87,9 @@ class TimeSeries(basechart.BaseChart): ...@@ -81,7 +87,9 @@ class TimeSeries(basechart.BaseChart):
# if the sensor has defined states, make the series a Step type series. # if the sensor has defined states, make the series a Step type series.
if sensor.unit.measure_type == 'state': if sensor.unit.measure_type == 'state':
series_opt['line']['shape'] = 'hv' if not averaging_hours:
series_opt['line']['shape'] = 'hv'
series.append( series_opt ) series.append( series_opt )
# Set the basic chart options # Set the basic chart options
......
...@@ -9,6 +9,8 @@ from dateutil.parser import parse ...@@ -9,6 +9,8 @@ from dateutil.parser import parse
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from bmsapp import models from bmsapp import models
from bmsapp.data_util import weighted_resample_timeseries
from bmsapp.readingdb import bmsdata from bmsapp.readingdb import bmsdata
from bmsapp.views_api_v1 import ( from bmsapp.views_api_v1 import (
fail_payload, fail_payload,
...@@ -156,7 +158,7 @@ def sensor_readings(request): ...@@ -156,7 +158,7 @@ def sensor_readings(request):
# if averaging is requested, do it! # if averaging is requested, do it!
if averaging and len(df) > 0: if averaging and len(df) > 0:
df = df.resample(rule = averaging, loffset = label_offset, label = 'left').mean().dropna(how='all') df = weighted_resample_timeseries(df,averaging,label_offset).dropna(how='all')
# make a dictionary that is formatted with orientation 'split', which is the most # make a dictionary that is formatted with orientation 'split', which is the most
# compact form to send the DataFrame # compact form to send the DataFrame
......