使用 Apache Spark 将键值对缩减为键列表对

我正在编写一个 Spark 应用程序,并希望将一组键值对 (K, V1), (K, V2), ..., (K, Vn) 组合成一个键-多值对(K, [V1, V2, ..., Vn]).我觉得我应该能够使用具有某种风味的 reduceByKey 函数来做到这一点:

I am writing a Spark application and want to combine a set of Key-Value pairs (K, V1), (K, V2), ..., (K, Vn) into one Key-Multivalue pair (K, [V1, V2, ..., Vn]). I feel like I should be able to do this using the reduceByKey function with something of the flavor:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))


The error that I get when this occurs is:


'NoneType' object has no attribue 'append'.

我的键是整数,值 V1,...,Vn 是元组.我的目标是使用键和值列表(元组)创建一对.

My keys are integers and values V1,...,Vn are tuples. My goal is to create a single pair with the key and a list of the values (tuples).




Input type and output type of reduce must be the same, therefore if you want to aggregate a list, you have to map the input to lists. Afterwards you combine the lists into one list.


您需要一种将列表合并为一个列表的方法.Python 提供了一些组合列表的方法.

You'll need a method to combine lists into one list. Python provides some methods to combine lists.

append 修改第一个列表,并且总是返回 None.

append modifies the first list and will always return None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend 做同样的事情,但解开列表:

extend does the same, but unwraps lists:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

这两种方法都返回 None,但您需要一个返回组合列表的方法,因此只需 使用加号.

Both methods return None, but you'll need a method that returns the combined list, therefore just use the plus sign.

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]


file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) 
         .map(lambda actor: (actor.split(",")[0], actor))  

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) 

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)



也可以使用 combineByKey 来解决这个问题,它在内部用于实现 reduceByKey,但它更复杂并且 在 Spark 中使用一种专门的按键组合器可以更快"一个>.对于上面的解决方案,您的用例已经足够简单了.

It's also possible to solve this with combineByKey, which is used internally to implement reduceByKey, but it's more complex and "using one of the specialized per-key combiners in Spark can be much faster". Your use case is simple enough for the upper solution.


也可以使用 groupByKey、但它会减少并行化,因此对于大数据集可能会慢得多.

It's also possible to solve this with groupByKey, but it reduces parallelization and therefore could be much slower for big data sets.

