How to transform data with sliding window over time series data in Pyspark(如何在 Pyspark 中随时间序列数据使用滑动窗口转换数据)
问题描述
我正在尝试根据时间序列数据的滑动窗口提取特征.在 Scala 中,似乎有一个基于 this post 和 文档
I am trying to extract features based on sliding window over time series data.
In Scala, it seems like there is a sliding function based on this post and the documentation
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
我的问题是 PySpark 中是否有类似的功能?或者如果还没有这样的功能,我们如何实现类似的滑动窗口变换?
My questions is there similar functions in PySpark? Or how do we achieve similar sliding window transformations if there is no such function yet?
推荐答案
据我所知 sliding 函数在 Python 中不可用并且 SlidingRDD 是一个私有类并且不能在 MLlib 之外访问.
As far as I can tell sliding function is not available from Python and SlidingRDD is a private class and cannot be accessed outside MLlib.
如果你在现有的 RDD 上使用 sliding,你可以像这样创建穷人的 sliding:
If you to use sliding on an existing RDD you can create poor man's sliding like this:
def sliding(rdd, n):
assert n > 0
def gen_window(xi, n):
x, i = xi
return [(i - offset, (i, x)) for offset in xrange(n)]
return (
rdd.
zipWithIndex(). # Add index
flatMap(lambda xi: gen_window(xi, n)). # Generate pairs with offset
groupByKey(). # Group to create windows
# Sort values to ensure order inside window and drop indices
mapValues(lambda vals: [x for (i, x) in sorted(vals)]).
sortByKey(). # Sort to makes sure we keep original order
values(). # Get values
filter(lambda x: len(x) == n)) # Drop beginning and end
或者,您可以尝试这样的事情(在 toolz)
Alternatively you can try something like this (with a small help of toolz)
from toolz.itertoolz import sliding_window, concat
def sliding2(rdd, n):
assert n > 1
def get_last_el(i, iter):
"""Return last n - 1 elements from the partition"""
return [(i, [x for x in iter][(-n + 1):])]
def slide(i, iter):
"""Prepend previous items and return sliding window"""
return sliding_window(n, concat([last_items.value[i - 1], iter]))
def clean_last_items(last_items):
"""Adjust for empty or to small partitions"""
clean = {-1: [None] * (n - 1)}
for i in range(rdd.getNumPartitions()):
clean[i] = (clean[i - 1] + list(last_items[i]))[(-n + 1):]
return {k: tuple(v) for k, v in clean.items()}
last_items = sc.broadcast(clean_last_items(
rdd.mapPartitionsWithIndex(get_last_el).collectAsMap()))
return rdd.mapPartitionsWithIndex(slide)
这篇关于如何在 Pyspark 中随时间序列数据使用滑动窗口转换数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:如何在 Pyspark 中随时间序列数据使用滑动窗口转
基础教程推荐
- 与常规 dict 相比,Python manager.dict() 非常慢 2022-01-01
- 用 Python 编写 Fortran 无格式文件 2022-01-01
- Discord.py 缺少必需的参数 2022-01-01
- 由Python将MP3转换为MIDI(类型错误:无法加载插件:mtg-Melodia:Melodia) 2022-01-01
- 将 x 轴刻度更改为自定义字符串 2022-01-01
- numpy float:比算术运算中内置的慢 10 倍? 2022-01-01
- 使用生成器和迭代器时 Python 多循环失败 2022-01-01
- 在 Celery 工作人员中捕获 Heroku SIGTERM 以优雅地关 2022-01-01
- 尝试制作WhatsApp机器人 2022-01-01
- pyserial - 可以从线程 a 写入串行端口,是否阻塞从线程 b 读取? 2022-01-01
