Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Introduction to Apache Spark's Core API (Part II)

DZone 's Guide to

Introduction to Apache Spark's Core API (Part II)

We take a look at some functions and methods we can use for working with pair RDD with multiple examples in the Python language.

· Big Data Zone ·
Free Resource

Hello coders, I hope you are all doing well.

In my previous post, Introduction to Apache Spark's Core API (Part I), I mentioned pure RDD's method and as I promised to explain the functions or methods with respect to pair RDD with multiple example snippets. So here it is!

To create pair RDD, please refer my previous post. With the help of that tutorial, you can create pair RDD (here, I am assuming that ordersPairRdd is my pair RDD which has a key labeled order_id and a value set as order).

  • Pair RDD Core APIs

    • ordersPairRdd.join(otherRDD)

      • This method returns an RDD containing all the pairs of elements with matching keys in otherRDD. The default join works the same as an inner join in SQL. 

      ordersPairRdd.first()
      # (u'1', u'1,2013-07-25 00:00:00.0,11599,CLOSED')
      orderItemsPairRDD.first()
      # (u'1', u'1,1,957,1,299.98,299.98')
      ordersJoinOrderItems = ordersPairRdd.join(orderItemsPairRDD)
      # (u'1', (u'1,2013-07-25 00:00:00.0,11599,CLOSED', u'1,1,957,1,299.98,299.98'))


    • ordersPairRdd.leftOuterJoin(otherRDD)

      • This method performs a left outer join on ordersPairRdd and otherRDD.

      • Let's say ordersPairRdd has (k,v) and otherRDD has (k,w) then the resultant RDD will have (k, (v,w)) and (k, (v, None)) if no elements in otherRDD have a key called k.

      ordersLeftJoinOrderItems = ordersPairRdd.leftOuterJoin(orderItemsPairRDD)
      ordersLeftJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • ordersPairRdd.rightOuterJoin(otherRDD)

      • This method performs a right outer join on ordersPairRdd and otherRDD.

      • Let's say ordersPairRdd has (k,v) and otherRDD has (k,w) then the resultant RDD will have (k, (v,w)) and (k, (None, V)) if no elements in ordersPairRdd have a key called k.

      ordersRightJoinOrderItems = ordersPairRdd.rightOuterJoin(orderItemsPairRDD)
      ordersRightJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • ordersPairRdd.fullOuterJoin(otherRDD)

      • This method performs a full outer join on ordersPairRdd and otherRDD.

      • Let's say ordersPairRdd has (k,v) and otherRDDhas (k,w) then then resultant RDD will have (k, (v,w)) and (k, (v, None)) if no elements in otherRDD have key k and (k, (None, V)) if no elements in inordersPairRdd have a key called k.

      ordersFullJoinOrderItems = ordersPairRdd.fullOuterJoin(orderItemsPairRDD)
      ordersFullJoinOrderItems.first()
      # (u'35236', (u'35236,2014-02-27 00:00:00.0,3987,PENDING', u'87999,35236,365,1,59.99,59.99'))


    • ordersPairRdd.countByKey()

      • This method is used to count the number of elements for each key and then return the result to the master as a dictionary.

      rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      rdd.countByKey().items()
      # [('a', 2), ('b', 1)]


    • ordersPairRdd.groupByKey()

      • This method groups the value for each key in the RDD into a single key.

      • In the case of the aggregation over each key, using reduceByKey and aggregateByKeywill provide much better performance than groupByKey.

      ordersPairRdd.groupByKey().mapValues(list).collect()
      #[(u'18065', [u'18065,2013-11-13 00:00:00.0,5684,PROCESSING']), (u'34148', [u'34148,2014-02-20 00:00:00.0,10198,COMPLETE'])]


    • ordersPairRdd.reduceByKey(func)

      • This method will merge the values for each key with the help of associative reduce function.

      pairRdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      test = pairRdd.reduceByKey(lambda x, y: x+y)
      test.collect()
      # [('a', 2), ('b', 1)]


    • ordersPairRdd.aggregateByKey(zeroValue, seqFunc, combFunc)

      • According to the Spark programming guide, this function aggregates the values of each key, using the given combine functions and a neutral "zero value." This function can return a different result type, U, then the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two Us. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

      pairRdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
      pairRdd.aggregateByKey((0),lambda x,y: (x+y),lambda r1,r2: (r1+r2)).collect()
      # [('a', 2), ('b', 1)]


    • ordersPairRdd.combineByKey(createCombiner, mergeValue, mergeCombiner)

      • This method used to combine the values for each key using a custom set of the aggregation function.

      • For example, if an RDD is of type (k,v) then it is possible that this function returns an RDD of type (k,w) where v and w can be a different type.

        • createCombiner - which turns a v into w.

        • mergeValue - to merge v into w.

        • mergeCombiner - to combine two 'w's into a single one.

        x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
        def add(a, b):
        return a + str(b)
        
        sorted(x.combineByKey(str, add, add).collect())
        # [('a', '11'), ('b', '1')]


    • ordersPairRdd.sortByKey(ascending=Truthy)

      • As the name indicates, this function is used to sort the pair RDD based on the key.

      tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
      sc.parallelize(tmp).sortByKey().first()
      # ('1', 3)
      
      tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
      tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
      sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
      # [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]


These are all the functions or methods for pair RDD, i.e. with the key:value pair. In my next post, I will explain the functions or methods with respect to pair RDD with multiple example snippets.

Thanks for reading and happy coding!

Topics:
spark api ,big data ,apache spark tutorial ,python tutorial ,pair rdd tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}