pyspark.RDD.combineByKey

RDD.combineByKey(createCombiner: Callable[[V], U], mergeValue: Callable[[U, V], U], mergeCombiners: Callable[[U, U], U], numPartitions: Optional[int] = None, partitionFunc: Callable[[K], int] = <function portable_hash>) → pyspark.rdd.RDD[Tuple[K, U]][source]

Generic function to combine the elements for each key using a custom set of aggregation functions.

Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a “combined type” C.

To avoid memory allocation, both mergeValue and mergeCombiners are allowed to modify and return their first argument instead of creating a new C.

In addition, users can control the partitioning of the output RDD.

New in version 0.7.0.

Parameters
createCombinerfunction

a function to turns a V into a C

mergeValuefunction

a function to merge a V into a C

mergeCombinersfunction

a function to combine two C’s into a single one

numPartitionsint, optional

the number of partitions in new RDD

partitionFuncfunction, optional, default portable_hash

function to compute the partition index

Returns
RDD

a RDD containing the keys and the aggregated result for each key

Notes

V and C can be different – for example, one might group an RDD of type

(Int, Int) into an RDD of type (Int, List[Int]).

Examples

>>>
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
>>> def to_list(a):
...     return [a]
...
>>> def append(a, b):
...     a.append(b)
...     return a
...
>>> def extend(a, b):
...     a.extend(b)
...     return a
...
>>> sorted(rdd.combineByKey(to_list, append, extend).collect())
[('a', [1, 2]), ('b', [1])]