• 1
  • 2
  • 3
  • 4
  • 5
阿里云主机ECS 首 页  »  帮助中心  »  云服务器  »  阿里云主机ECS
使用Pivot在Spark中重塑数据
发布日期:2016-4-6 17:4:32

  使用Pivot在Spark中重塑数据

  本文引自Andrew Ray博士在Silicon Valley Data Science网站上发表的博客,Andrew Ray博士对大数据有浓厚的兴趣并且他还有着丰富的Spark使用经验。Andrew同时也是一名活跃的Apache Spark源码贡献者,其源码贡献主要集中在Spark SQL与GraphX组件上。

  透视(pivot)数据功能是Spark 1.6众多新增的特性之一,通过使用DataFrame(目前支持Scala、Java和Python语言)创建透视表(pivot table)。透视可以视为一个聚合操作,通过该操作可以将一个(实际当中也可能是多个)具有不同值的分组列转置为各个独立的列。透视表在数据分析与报告中占有十分重要的地位,现在许多流行的数据操纵工具(如pandas、reshape2和Excel)与阿里云主机数据库(如MS SQL和Oracle 11g)都有透视数据的能力。在以前的博客当中我已做了简要介绍,在本文中,我将会更深入地给大家讲解具体细节。本博文的代码可以从这里下载。

  一、语法

  在透视操作进行pull请求的过程中,我进行了许多与之相关的研究,其中一项便是进行比较其它优秀工具的语法,目前透视语法格式多种多样,Spark 透视功能最主要的两个竞争对手是pandas(Python语言)与reshape2(R语言)。

  

图1

  

图2

  比如,我们想对A列和B列进行分组,然后在C列上进行透视操作并对D列数据进行求和,pandas的语法格式为 pivot_table(df, values=’D’, index=[‘A’, ‘B’], columns=[‘C’], aggfunc=np.sum),这虽然看起来有点冗长但表达还算清晰,使用reshape2的话,其语法格式为 dcast(df, A + B ~ C, sum),借助R语言公式的表达能力,这种语法十分紧凑,需要注意的是reshape2不需要指定求值列,因为它自身具备将剩余DataFrame列作为最终求值列的能力(当然也可能通过其它参数进行显式指定)。

  我们提出Spark透视操作自有的语法格式,它能够与DataFrame上现有其它聚合操作完美结合,同样是进行group/pivot/sum操作,在Spark中其语法为:df.groupBy(“A”, “B”).pivot(“C”).sum(“D”),显然这种语法格式非常直观,但这其中也有个值得注意的地方:为取得更好的性能,需要明确指定透视列对应的不同值,例如如果C列有两个不同的值(small 和 large),则性能更优的版本语法为: df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)。当然,这里给出的是Scala语言实现,使用Java语言和Python语言实现的话方法也是类似的。

  二、报告

  我们来看一些实际应用案例,假如你是一个大型零售商(例如我前任东家),销售数据具有标准交易格式而且你想制作一些汇总数据透视表。当然,你可以选择将数据聚合到可管理的大小,然后使用其它工具去制作最终的数据透视表(尽管初始聚合操作的粒度受限)。但现在你可以在Spark中进行所有操作(在进行这些操作之前需要进行若干IF判断),不幸的是没有大的零售商愿意将它们原始的销售数据共享给我们,因此我们将使用合成的阿里云数据进行演示,这里推荐使用TPC-DS 数据集,该数据集是我用过的数据集中比较好的一个,它的元数据(Schema)与实际零售数据非常相似。

  

图3

  由于TPC-DS是为进行不同大小的“大数据”数据库基准测试而合成的数据集,所以我们可以使用尺度因子(scale factors)决定最终想要生成的数据集大小。简单起见,这里的尺度因子为1,对应数据集大小为1GB。因为需求有点复杂,我使用了docker镜像以便大家可以跟着学习。假设我们想根据种类(category)与季度(quarter)对数据进行汇总,各季度数据最终以列的形式在数据透视表中展示,此时我们可以通过以下代码以完成上述需求(更真实的查询可能会有更多条件如时间范围等):

  (sql("""select *, concat('Q', d_qoy) as qoy

  from store_sales

  join date_dim on ss_sold_date_sk = d_date_sk

  join item on ss_item_sk = i_item_sk""")

  .groupBy("i_category")

  .pivot("qoy")

  .agg(round(sum("ss_sales_price")/1000000,2))

  .show)

  +-----------+----+----+----+----+

  | i_category| Q1| Q2| Q3| Q4|

  +-----------+----+----+----+----+

  | Books|1.58|1.50|2.84|4.66|

  | Women|1.41|1.36|2.54|4.16|

  | Music|1.50|1.44|2.66|4.36|

  | Children|1.54|1.46|2.74|4.51|

  | Sports|1.47|1.40|2.62|4.30|

  | Shoes|1.51|1.48|2.68|4.46|

  | Jewelry|1.45|1.39|2.59|4.25|

  | null|0.04|0.04|0.07|0.13|

  |Electronics|1.56|1.49|2.77|4.57|

  | Home|1.57|1.51|2.79|4.60|

  | Men|1.60|1.54|2.86|4.71|

  +-----------+----+----+----+----+

  请注意,为便于更清晰地比较,我们将销售额以百万元为单位并精确到小数点后两位,上面的数据结果有两个值得注意的地方:首先,四季度的数据明显要更多,这对任何熟悉零售业的人来说都很好理解;其次,同一季度中种类为null的异常结果值比较接近。遗憾的是,即使是如此优秀的合成数据集也与真实情况有出入,如果你有比该合成数据集更好且对公众开放的数据,请告诉我。

  三、特征生成

  第二个例子,我们来看预测模型中的特征生成,在实际应用中,数据集中的目标观测值常常以每条一行(称为长格式或窄数据)的格式进行组织。为了构建模型,我们首先需要将数据重塑,每个目标值重塑为一行,根据上下文该任务我们可以有多种方法来完成,其中的一种方法便是通过Spark中的透视操作来完成。这也许是其它工具如pandas、reshape2和Excel完成不了的,因为结果集可能有成百万甚至数十亿行。

  我将使用相对较小的MovieLens 1M数据集使实验能够容易地再现,该数据集中包含了由6040个用户,针对3952个电影生成的大约一百万个电影评级数据。我们尝试根据100个最流行的电影评级来预测用户的性别。在下面的例子当中,评级表有三列:user、 movie和rating。

  +----+-----+------+

  |user|movie|rating|

  +----+-----+------+

  | 11| 1753| 4|

  | 11| 1682| 1|

  | 11| 216| 4|

  | 11| 2997| 4|

  | 11| 1259| 3|

  ...

  为得到每用户一行格式的数据,我们将进行如下透视操作:

  val ratings_pivot = ratings.groupBy("user").pivot("movie", popular.toSeq).agg(expr("coalesce(first(rating),3)").cast("double"))

  上面代码中的popular变量为最流行的电影列表(通过评级数得到),同时我们将默认评级设为3,对于用户11,其影评数据结果如下:

  +----+----+---+----+----+---+----+---+----+----+---+...

  |user|2858|260|1196|1210|480|2028|589|2571|1270|593|...

  +----+----+---+----+----+---+----+---+----+----+---+...

  | 11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...

  +----+----+---+----+----+---+----+---+----+----+---+...

  上面的数据为建模时所需要的宽格式数据,完整例子代码在这。需要注意的是:我只使用了100个最流行的电影,因为当前的透视操作需要作用于成千上万个不同值,在当前的实现中其速度不是特别快。我们未来将解决这一问题。

  四、提示和技巧

  为了获取最好的性能,透视操作时需要指定透视列对应的不同值(如果你知道的话),否则Spark会立即启动一个job来确定这些值。另外,它们将按照排好的顺序放置,对大部分应用而言,这种做法是合理的,而对部分应用而言,如每周各天的顺序,这种做法是不合理的(如Friday, Monday, Saturday等) 。

  透视同其它正常的聚合操作一样,支持多个聚合表达式,只要将多个参数传递给agg方法即可,例如df.groupBy(“A”, “B”).pivot(“C”).agg(sum(“D”), avg(“D”))

  尽管语法上只允许对某一列进行透视,但你可以将多个列组合起来,其得到的结果与透视多个列得到的结果一样,例如:

  +----+----+---+----+----+---+----+---+----+----+---+...

  |user|2858|260|1196|1210|480|2028|589|2571|1270|593|...

  +----+----+---+----+----+---+----+---+----+----+---+...

  | 11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...

  +----+----+---+----+----+---+----+---+----+----+---+...

  最后,你可能会对在未明确指定时,对应透视列所允许的值最大数感兴趣,这也是捕获错误及避免内存溢出(OOM)场景的主要关注点。其配置键(config key)为spark.sql.pivotMaxValues,默认值为10000,你可能并不需要对其进行修改。

  五、实现

  透视函数的实现通过添加新的逻辑算子(o.a.s.sql.catalyst.plans.logical.Pivot)进行,该逻辑算子被新的分析器规则(o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot)翻译,该新分析器规则会将其翻译成带有许多带有if语句的聚合操作,每个透视值对应一个表达式。

  比如, df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)将被翻译成df.groupBy(“A”, “B”).agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”))。你也可能直接这么用但这会使代码比较冗长且容易出错。

  六、未来的工作

  Spark中的透视功能仍然有待于提升,目前大量的工作集中在以下几个方面:

  1.   在R API和SQL语法(类似Oracle 11g和MS SQL)中添加透视功能,为用户提供更大的语言选择范围,使透视功能使用更简便。
  2.    添加逆透视的支持,其功能与透视操作相反
  3.   当透视列中的不同值较多时需要提升透视的速度,我目前正在想办法解决这一问题。