Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Open sidebar
energy
bmon
Compare Revisions
81f6c1ec943c9c80bbc06e8d1939b4fe7dcdf39a...bdddc06e887a88c1aab21208d8795fcbdde9fe2e
Commits (4)
Rolling Averaging: increase minimum window size
· 23b246cd
alaskamapscience
authored
May 07, 2021
23b246cd
New weighted resampling function for timeseries
· fa67ffe6
alaskamapscience
authored
May 28, 2021
fa67ffe6
Improvements to weighted averaging
· 5e71ddd7
alaskamapscience
authored
Jun 07, 2021
5e71ddd7
Merge branch 'master' into bare-server
· bdddc06e
Alan Mitchell
authored
Jun 21, 2021
bdddc06e
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
131 additions
and
8 deletions
+131
-8
bmsapp/data_util.py
bmsapp/data_util.py
+116
-3
bmsapp/reports/timeseries.py
bmsapp/reports/timeseries.py
+12
-4
bmsapp/views_api_v2.py
bmsapp/views_api_v2.py
+3
-1
No files found.
bmsapp/data_util.py
View file @
bdddc06e
...
...
@@ -95,7 +95,119 @@ def histogram_from_series(pandas_series):
# to 4 significant figures
return
list
(
zip
(
avg_bins
,
cts
))
def
resample_timeseries
(
pandas_dataframe
,
averaging_hours
,
use_rolling_averaging
=
False
,
drop_na
=
True
):
def
resample_timeseries
(
pandas_dataframe
,
averaging_hours
,
use_rolling_averaging
=
False
,
drop_na
=
True
,
interp_method
=
'pad'
):
'''
Returns a new pandas dataframe that is resampled at the specified "averaging_hours"
interval. If the 'averaging_hours' parameter is fractional, the averaging time
period is truncated to the lesser minute.
If 'drop_na' is True, rows with any NaN values are dropped.
For some reason the pandas resampling sometimes fails if the datetime index is timezone aware...
'''
interval_lookup
=
{
0.5
:
{
'rule'
:
'30min'
,
'loffset'
:
'15min'
},
1
:
{
'rule'
:
'1H'
,
'loffset'
:
'30min'
},
2
:
{
'rule'
:
'2H'
,
'loffset'
:
'1H'
},
4
:
{
'rule'
:
'4H'
,
'loffset'
:
'2H'
},
8
:
{
'rule'
:
'8H'
,
'loffset'
:
'4H'
},
24
:
{
'rule'
:
'1D'
,
'loffset'
:
'12H'
},
168
:
{
'rule'
:
'1W'
,
'loffset'
:
'108H'
},
720
:
{
'rule'
:
'1M'
,
'loffset'
:
'16D'
},
8760
:
{
'rule'
:
'AS'
,
'loffset'
:
'6M'
}
}
params
=
interval_lookup
.
get
(
averaging_hours
,
{
'rule'
:
str
(
int
(
averaging_hours
*
60
))
+
'min'
,
'loffset'
:
str
(
int
(
averaging_hours
*
30
))
+
'min'
})
if
not
use_rolling_averaging
:
# apply averaging weighted by duration
dfResampled
=
weighted_resample_timeseries
(
pandas_dataframe
,
params
[
'rule'
],
params
[
'loffset'
],
interp_method
)
else
:
# apply rolling averaging
dfResampled
=
pandas_dataframe
.
sort_index
().
rolling
(
str
(
int
(
averaging_hours
*
60
))
+
'min'
).
mean
()
dfResampled
=
dfResampled
[
pandas_dataframe
.
index
.
min
()
+
pd
.
tseries
.
frequencies
.
to_offset
(
params
[
'rule'
]):]
# offset the index label to the center of each period
dfResampled
.
index
=
dfResampled
.
index
-
pd
.
tseries
.
frequencies
.
to_offset
(
params
[
'loffset'
])
if
drop_na
:
dfResampled
=
dfResampled
.
dropna
()
return
dfResampled
def
weighted_resample_timeseries
(
pandas_dataframe
,
averaging
,
offset
,
interp_method
=
'pad'
):
'''
Returns a new pandas dataframe that is resampled at the specified
interval.
'''
# discard duplicate values in the index
pandas_dataframe
=
pandas_dataframe
[
~
pandas_dataframe
.
index
.
duplicated
()]
# insert new rows representing the breaks for each averaging period
window_breaks
=
pandas_dataframe
.
resample
(
averaging
,
label
=
'left'
).
asfreq
().
shift
(
freq
=
pd
.
tseries
.
frequencies
.
to_offset
(
'-1N'
))
window_breaks
[:]
=
np
.
nan
# calculate the average number of hours in the resampling periods
averaging_hours
=
(
window_breaks
.
index
.
to_series
().
diff
()
/
pd
.
Timedelta
(
1
,
'hour'
)).
mean
()
interp_limit
=
int
(
24
/
averaging_hours
)
+
1
# limit interpolation to 24 hours or 1 averaging period
# also create breaks that are shifted 1 day forward
window_breaks_shifted
=
window_breaks
.
shift
(
freq
=
'1D'
)
window_breaks_shifted
=
window_breaks_shifted
[
window_breaks_shifted
.
index
<
pandas_dataframe
.
index
.
max
()]
window_breaks
=
window_breaks
.
append
(
window_breaks_shifted
[
~
window_breaks_shifted
.
index
.
isin
(
window_breaks
.
index
)])
df
=
pandas_dataframe
.
append
(
window_breaks
[
~
window_breaks
.
index
.
isin
(
pandas_dataframe
.
index
)]).
sort_index
()
# interpolate values
df
=
df
.
interpolate
(
method
=
interp_method
,
limit
=
interp_limit
)
# calculate the 'duration' weights for each row and for each time period (based on difference from previous timestamp)
value_duration
=
(
df
.
index
.
to_series
()
-
df
.
index
.
to_series
().
shift
(
1
))
/
pd
.
Timedelta
(
1
,
'hour'
)
value_duration
=
value_duration
.
clip
(
lower
=
None
,
upper
=
24
)
# maximum weight = 24 hours
# shift the dataframe index to place values at the end of each peiod
df
=
df
.
shift
(
1
)
# multiply values by weights
df
=
df
.
multiply
(
value_duration
,
axis
=
'index'
)
df
[
'value_duration_weight'
]
=
value_duration
# discard the last datapoint (because the weight is unknown)
df
=
df
[:
-
1
]
# resample and calculate the weighted average for each time period
dfResampled
=
df
.
resample
(
rule
=
averaging
,
closed
=
'right'
,
label
=
'left'
).
sum
()
dfResampled
=
dfResampled
[
pandas_dataframe
.
columns
].
div
(
dfResampled
[
'value_duration_weight'
],
axis
=
'index'
)
if
offset
:
# offset the index label to the center of each period
dfResampled
.
index
=
dfResampled
.
index
+
pd
.
tseries
.
frequencies
.
to_offset
(
offset
)
return
dfResampled
def
decimate_timeseries
(
df
,
bin_count
=
1000
,
col
=
None
):
'''
Decimates a dataframe to limit the maximum number of datapoints for plotting
'''
if
len
(
df
)
>
bin_count
*
2
:
if
col
==
None
:
# default to using the first column
col
=
df
.
columns
[
0
]
# bin the index values
bins
=
df
.
groupby
(
pd
.
cut
(
df
.
index
,
bins
=
bin_count
,
labels
=
np
.
arange
(
0
,
bin_count
)).
astype
(
int
))
# keep the max and min value in each bin
maximums
=
df
.
loc
[
bins
[
col
].
idxmax
()]
minimums
=
df
.
loc
[
bins
[
col
].
idxmin
()]
return
pd
.
concat
([
maximums
,
minimums
]).
drop_duplicates
().
sort_index
()
else
:
return
df
def
old_resample_timeseries
(
pandas_dataframe
,
averaging_hours
,
use_rolling_averaging
=
False
,
drop_na
=
True
):
'''
Returns a new pandas dataframe that is resampled at the specified "averaging_hours"
interval. If the 'averaging_hours' parameter is fractional, the averaging time
...
...
@@ -123,10 +235,11 @@ def resample_timeseries(pandas_dataframe, averaging_hours, use_rolling_averaging
else
:
# resample to consistent interval
original_interval
=
pandas_dataframe
.
index
.
to_series
().
diff
().
quantile
(.
05
)
new_df
=
pandas_dataframe
.
resample
(
rule
=
original_interval
).
median
()
new_df
=
pandas_dataframe
.
resample
(
rule
=
original_interval
).
median
()
.
ffill
(
limit
=
1
)
# apply the rolling averaging
new_df
=
new_df
.
rolling
(
int
(
pd
.
Timedelta
(
hours
=
averaging_hours
)
/
original_interval
),
center
=
True
,
min_periods
=
1
).
mean
()
window_size
=
int
(
pd
.
Timedelta
(
hours
=
averaging_hours
)
/
original_interval
)
new_df
=
new_df
.
rolling
(
window_size
,
center
=
True
,
min_periods
=
int
(
window_size
*
0.75
)
+
1
).
mean
()
# downsample the result if there are more than 1000 values
if
len
(
new_df
)
>
1000
:
...
...
bmsapp/reports/timeseries.py
View file @
bdddc06e
...
...
@@ -36,8 +36,6 @@ class TimeSeries(basechart.BaseChart):
averaging_hours
=
0
use_rolling_averaging
=
False
# determine the start time for selecting records and loop through the selected
# records to get the needed dataset
st_ts
,
end_ts
=
self
.
get_ts_range
()
...
...
@@ -58,7 +56,15 @@ class TimeSeries(basechart.BaseChart):
if
not
df
.
empty
:
# perform average (if requested)
if
averaging_hours
:
df
=
bmsapp
.
data_util
.
resample_timeseries
(
df
,
averaging_hours
,
use_rolling_averaging
)
if
sensor
.
unit
.
measure_type
==
'state'
:
# if the sensor has defined states
interp_method
=
'pad'
else
:
interp_method
=
'linear'
df
=
bmsapp
.
data_util
.
resample_timeseries
(
df
,
averaging_hours
,
use_rolling_averaging
,
interp_method
=
interp_method
)
# limit the number of points to plot
df
=
bmsapp
.
data_util
.
decimate_timeseries
(
df
,
bin_count
=
1000
,
col
=
'val'
)
# create lists for plotly
if
np
.
absolute
(
df
.
val
.
values
).
max
()
<
10000
:
...
...
@@ -81,7 +87,9 @@ class TimeSeries(basechart.BaseChart):
# if the sensor has defined states, make the series a Step type series.
if
sensor
.
unit
.
measure_type
==
'state'
:
if
not
averaging_hours
:
series_opt
[
'line'
][
'shape'
]
=
'hv'
series
.
append
(
series_opt
)
# Set the basic chart options
...
...
bmsapp/views_api_v2.py
View file @
bdddc06e
...
...
@@ -9,6 +9,8 @@ from dateutil.parser import parse
from
django.views.decorators.csrf
import
csrf_exempt
from
bmsapp
import
models
from
bmsapp.data_util
import
weighted_resample_timeseries
from
bmsapp.readingdb
import
bmsdata
from
bmsapp.views_api_v1
import
(
fail_payload
,
...
...
@@ -156,7 +158,7 @@ def sensor_readings(request):
# if averaging is requested, do it!
if
averaging
and
len
(
df
)
>
0
:
df
=
df
.
resample
(
rule
=
averaging
,
loffset
=
label_offset
,
label
=
'left'
).
mean
(
).
dropna
(
how
=
'all'
)
df
=
weighted_resample_timeseries
(
df
,
averaging
,
label_offset
).
dropna
(
how
=
'all'
)
# make a dictionary that is formatted with orientation 'split', which is the most
# compact form to send the DataFrame
...
...