Problem Scenario 2

PLEASE READ THE INTRODUCTION TO THIS SERIES. CLICK ON HOME LINK AND READ THE INTRO BEFORE ATTEMPTING TO SOLVE THE PROBLEMS

Video walk through of the solution to this problem can be found here [Click here]

Click here for the video version of this series. This takes you to the youtube playlist of videos. 

Problem 2:
  1. Using sqoop copy data available in mysql products table to folder /user/cloudera/products on hdfs as text file. columns should be delimited by pipe '|'
  2. move all the files from /user/cloudera/products folder to /user/cloudera/problem2/products folder
  3. Change permissions of all the files under /user/cloudera/problem2/products such that owner has read,write and execute permissions, group has read and write permissions whereas others have just read and execute permissions
  4. read data in /user/cloudera/problem2/products and do the following operations using a) dataframes api b) spark sql c) RDDs aggregateByKey method. Your solution should have three sets of steps. Sort the resultant dataset by category id
    • filter such that your RDD\DF has products whose price is lesser than 100 USD
    • on the filtered data set find out the higest value in the product_price column under each category
    • on the filtered data set also find out total products under each category
    • on the filtered data set also find out the average price of the product under each category
    • on the filtered data set also find out the minimum price of the product under each category
  5. store the result in avro file using snappy compression under these folders respectively
    • /user/cloudera/problem2/products/result-df
    • /user/cloudera/problem2/products/result-sql
    • /user/cloudera/problem2/products/result-rdd
Solution: 
Try your best to solve the above scenario without going through the solution below. If you could then use the solution to compare your result. If you could not then I strongly recommend that you go through the concepts again (this time in more depth). Each step below provides a solution to the points mentioned in the Problem Scenario. 

Step 1
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username retail_dba \
--password cloudera \
--table products \
--as-textfile \
--target-dir /user/cloudera/products \
--fields-terminated-by '|';

Step 2
hadoop fs -mkdir /user/cloudera/problem2/
hadoop fs -mkdir /user/cloudera/problem2/products
hadoop fs -mv /user/cloudera/products/* /user/cloudera/problem2/products/

Step 3
//Read is 4, Write is 2 and execute is 1. 
//ReadWrite,Execute = 4 + 2 + 1 = 7
//Read,Write = 4+2 = 6
//Read ,Execute=4+1=5

hadoop fs -chmod 765 /user/cloudera/problem2/products/*


Step 4: 

scala> var products = sc.textFile("/user/cloudera/products").map(x=> {var d = x.split('|'); (d(0).toInt,d(1).toInt,d(2).toString,d(3).toString,d(4).toFloat,d(5).toString)});


scala>case class Product(productID:Integer, productCatID: Integer, productName: String, productDesc:String, productPrice:Float, productImage:String);


scala> var productsDF = products.map(x=> Product(x._1,x._2,x._3,x._4,x._5,x._6)).toDF();




Step 4-Data Frame Api: 

scala> import org.apache.spark.sql.functions._
scala> var dataFrameResult = productsDF.filter("productPrice < 100").groupBy(col("productCategory")).agg(max(col("productPrice")).alias("max_price"),countDistinct(col("productID")).alias("tot_products"),round(avg(col("productPrice")),2).alias("avg_price"),min(col("productPrice")).alias("min_price")).orderBy(col("productCategory"));
scala> dataFrameResult.show();



Step 4 - Spark SQL: 

productsDF.registerTempTable("products");
var sqlResult = sqlContext.sql("select product_category_id, max(product_price) as maximum_price, count(distinct(product_id)) as total_products, cast(avg(product_price) as decimal(10,2)) as average_price, min(product_price) as minimum_price from products where product_price <100 group by product_category_id order by product_category_id desc");
sqlResult.show();


Step 4 - RDD aggregateByKey: 

var rddResult = productsDF.map(x=>(x(1).toString.toInt,x(4).toString.toDouble)).aggregateByKey((0.0,0.0,0,9999999999999.0))((x,y)=>(math.max(x._1,y),x._2+y,x._3+1,math.min(x._4,y)),(x,y)=>(math.max(x._1,y._1),x._2+y._2,x._3+y._3,math.min(x._4,y._4))).map(x=> (x._1,x._2._1,(x._2._2/x._2._3),x._2._3,x._2._4)).sortBy(_._1, false);
rddResult.collect().foreach(println);


Step 5: 



    -> import com.databricks.spark.avro._;
    -> sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
    ->dataFrameResult.write.avro("/user/cloudera/problem2/products/result-df");
    ->sqlResult.write.avro("/user/cloudera/problem2/products/result-sql");
    ->rddResult.toDF().write.avro("/user/cloudera/problem2/products/result-rdd");;











61 comments:

  1. Hi Arun,

    First of all thank you for such a confidence boosting blog on CCA175.
    I would like to highlight a small correction in aggregateByKey transformation.
    The default value of the MIN_PRICE should not be 0.0. If it is 0.0, for every MATH.MIN(a,b) the output would be 0.0. As a workaround this could be replaced by a higher value(10000.0) which would ultimately be swapped by the MIN values in the process.

    Sample output:
    (58,241.0,170.0,4,115.0)
    (57,189.99,154.99,6,109.99)
    (56,159.99,159.99,2,159.99)
    (54,299.99,209.99,6,129.99)


    Regards,
    --Lax Dash

    ReplyDelete
    Replies
    1. Sorry about that. I actually did it in the right way in the video. Check video tutorial for this between 31.08 and 31.10. The correct solution already exists. I think i did not update the blog. Thanks for bringing to my notice.

      Delete
  2. In the exam is it asked whether to solve any problem with RDD or DataFrames.
    Or what matters is the output?
    Please respond!!

    ReplyDelete
    Replies
    1. there is a very very good possibility that you get questions where you are you asked to complete missing lines of code in a spark program written in scala or python. While the exam looks at final output, for you fill the missing lines of code is the fastest option. otherwise, you will have to rewrite the code in your preferred way which will not always result in the right answer. And most importantly will be time consuming. I hope i answered your question

      Delete
    2. Thank you Arun Sir!!
      I am following you video and have noticed that you are also using Scala.
      I also was wondering if the code snippet in the exam is in python or Scala. Or do we get a choice to at the first place which language are we going to write the exam. I have been practicing CCA175 with Scala.

      Delete
    3. According to exam website you will have to understand both scala and python. Majority of the problem solution consists of using api. the function names are same in python on scala. I recommend that you solve all these problems using python as well so that you are well prepared answering any questions related to completing a portion of code that is written either in scala or python.

      Delete
  3. var d = x.split('|'); -----> var d = x.split('\\|');

    ReplyDelete
    Replies
    1. Adarsh,

      thank you for following my posts. You don't have to escape when supplying a character literal for pipe character. Here are all the variations. I hope below helps you remember what works and what does not for your exam. All the very best.

      -- WITH ESCAPE BUT PASSING A STRING THAT IS A LITERAL INSIDE A DOUBLE QUOTES

      scala> sc.textFile("/user/cloudera/products").map(x=> {var d =x.split("\\|"); (d(0),d(1),d(2))}).take(1).foreach(println);
      Output - (1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U)

      --WITHOUT ESCAPE PASSING A STRING I.E LITERAL WITHING DOUBLE QUOTES
      scala> sc.textFile("/user/cloudera/products").map(x=> {var d =x.split("|"); (d(0),d(1),d(2))}).take(1).foreach(println);
      Output - (,1,|)

      --WITHOUT ESCAPE CHAR AND SENDING A CHAR LITERAL THAT IS in single quotes.

      scala> sc.textFile("/user/cloudera/products").map(x=> {var d =x.split('|'); (d(0),d(1),d(2))}).take(1).foreach(println);
      Output - (1,2,Quest Q64 10 FT. x 10 FT. Slant Leg Instant U)

      Delete
    2. Thanks a lot Arun.. it really helped me ..

      Delete
    3. This comment has been removed by the author.

      Delete
    4. Thanks Arun for the very thorough problem stmts, am preparing for CCA175 using ur blog. Here I have question
      on your answer to adarsh.

      So the safe way to use for split is this
      x.split("\\|") ??

      I have always used x.split(",") for my comma delimited fields so far , so why not x.split("|") work for us , pls explain.

      Delete
    5. So Arun , can you explain which one to split is best out of three.

      Delete
  4. This comment has been removed by the author.

    ReplyDelete
  5. scala> var productsDF = products.map(x=> Product(x._1,x._2,x._3,x._4,x._5,x._6)).toDF();

    gives error on quickstart 5.10
    "Failed to start database 'metastore_db' with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@5fa9ef3d, see the next exception for details.
    at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
    at org.apache.derby.impl.jdbc.SQLExceptionFactory.wrapArgsForTransportAcrossDRDA(Unknown Source)
    ... 135 more
    Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/cloudera/metastore_db.
    at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
    at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)
    at org.apache.derby.impl.store.raw.data.BaseDataFileFactory.privGetJBMSLockOnDB(Unknown Source)
    at org.apache.derby.impl.store.raw.data.BaseDataFileFactory.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    "
    I you help to resolve it.

    ReplyDelete
    Replies
    1. Hi Team......can anyone help me how to fix the below error in quick start
      "Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /home/cloudera/metastore_db.
      at org.apache.derby.iapi.error.StandardException.newException(Unknown Source)

      Delete
  6. Hi Arun ! the question asks the user to sort the data by product_category_id (which bydefault i understand to be in ascending order). Whereas in your solution under "SPARK SQL" you have ordered the result set by descending order.

    " product_category_id desc"

    In the DF result set, you have sorted in ascending order - "orderBy(col("productCategory"));"

    am i missing anything here or should your answer be updated in this blog ?

    ReplyDelete
  7. hi arun,
    while am executing the step-4, am getting the below error as managed memory leak. And my spark version is spark 1.6.0 . Am doing this in cloudera quickstart vm.
    As it is error, it is not showing the result also.

    Can you please tell me how to change this ERROR to WARN so that i can see the result.

    dataFrameResult.show();

    17/11/09 13:19:01 WARN memory.TaskMemoryManager: leak 8.3 MB memory from org.apache.spark.unsafe.map.BytesToBytesMap@129e18f
    17/11/09 13:19:01 ERROR executor.Executor: Managed memory leak detected; size = 8650752 bytes, TID = 7
    17/11/09 13:19:01 ERROR executor.Executor: Exception in task 0.0 in stage 10.0 (TID 7)

    ReplyDelete
  8. Hi Arun,

    Thank you for the sharing, I found some issues which looks typo when trying your solutions.

    Please help to correct me if I am wrong, thanks.

    STEP 4 :
    scala> var products = sc.textFile("/user/cloudera/problem2/products/").map(x=> {var d = x.split('|'); (d(0).toInt,d(1).toInt,d(2).toString,d(3).toString,d(4).toFloat,d(5).toString)});


    Step 4-Data Frame Api:
    scala> var dataFrameResult = productsDF.filter("productPrice < 100").groupBy(col("productCatID")).agg(max(col("productPrice")).alias("max_price"),countDistinct(col("productID")).alias("tot_products"),round(avg(col("productPrice")),2).alias("avg_price"),min(col("productPrice")).alias("min_price")).orderBy(col("productCatID"));

    Step 4- spark sql:
    var sqlResult = sqlContext.sql("SELECT productCatID, max(productPrice) AS maximum_price, count(distinct(productID)) AS total_products, cast(avg(productPrice) as DECIMAL(10,2)) AS average_price, min(productPrice) AS minimum_price FROM products WHERE productPrice < 100 GROUP BY productCatID ORDER BY productCatID asc");

    ReplyDelete
  9. In the second question, if you are making both the directories at that moment itself, what dta is getting transferred when you are using -mv command?

    ReplyDelete
  10. Hi Arun, one important query, for calculating the avg. price you are rounding it off to two decimal places even though it's not explicitly mentioned in the problem statement. So, will it be mentioned explicitly during the exam? or do we need to take care of it implicitly?

    ReplyDelete
  11. Hi Arun, in the real exam, can I use pyspark?
    Or it's only allowed use spark-shell ?

    ReplyDelete
  12. Hi Arun,

    Thanks a ton for the wonderful blog, it is helping us a lot to get certified. I think you have missed to filter out the products of price lower than 100 USD while using RDD. Correct me if I'm wrong.

    ReplyDelete
  13. Hi
    could you please let me know that in CCA175 , will they provide any IDE like eclipse with scala or pythan code sinppet ?
    Please let me know

    Thanks
    Vinay Mallela

    ReplyDelete
    Replies
    1. Yes , u will have access to intelliJ , eclipse , sublimetxt etc . It will all be inside the Virtual machine u will have to log into. U will also open browsers there , which will only show u links for all the cloudera standard documentation.

      Just make sure ,u r easy and comfy working with Cloudera VM , it will be a similar environment.

      Delete
  14. IT is very informative blog and useful article thank you for sharing with us , keep posting learn more about Hadoop Admin Online Training Hyderbad

    ReplyDelete
  15. This comment has been removed by the author.

    ReplyDelete
  16. It is nice blog Thank you provide important information and i am searching for same information to save my timeBig data hadoop online Course India

    ReplyDelete
  17. Hi Arun, This blog is awesome. When I was doing problem 2 in step4 for spark I found that in sql query columns/fields mentioned as product_category_id,product_price & product_id but we have created the Dataframe with productCatID,productPrice & productID. Wehenver I am running sql query with product_category_id,product_price & product_id getting error as "org.apache.spark.sql.AnalysisException: cannot resolve 'product_price' given input columns: [productDesc, productImage, productPrice, productCatID, productID, productName];"

    So please check and correct it if it is wrong.

    Thank you.

    ReplyDelete
  18. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop Training in electronic city, Bangalore | #Big Data Hadoop Training in electronic city, Bangalore

    ReplyDelete
  19. In aggregateByKey answer believe will have to add the Filter <100 also. In Video its correct.

    ReplyDelete
  20. hi can you please provide the sample data of the product table. I have not understood the use of Count Distinct applied on product_id column because this column is auto increment this column value is always going to be unique

    ReplyDelete
    Replies
    1. Hi, you have right, there is no needing to use count distinct product_id, count is enough.

      Delete
  21. I got this error , can anyone help me ?
    scala> val result_DF = productdf.filter("productPrice < 100").groupBy(col("productCategoryID")).agg(col(max("productPrice")).alias(max_price),countDistinct(col("productID")).alias(total_products),col(min("productPrice")).alias(min_price),col(avg("productPrice")).alias(moyenne_price))
    :39: error: type mismatch;
    found : org.apache.spark.sql.Column
    required: String
    val result_DF = productdf.filter("productPrice < 100").groupBy(col("productCategoryID")).agg(col(max("productPrice")).alias(max_price),countDistinct(col("productID")).alias(total_products),col(min("productPrice")).alias(min_price),col(avg("productPrice")).alias(moyenne_price))

    ReplyDelete
  22. For Pyspark

    prdd=sc.textFile("/user/cloudera11/products")
    from pyspark.sql import Row

    pdf=prdd.map(lambda x:Row(product_id=x.split("|")[0],product_category_id=x.split("|")[1],product_name=x.split("|")[2],product_price=x.split("|")[4])).toDF()

    pdf.registerTempTable("PT")

    pd=sqlContext.sql("select product_category_id, count(product_id),max(product_price),avg(product_price),min(product_price) from PT \
    where product_price < 100 \
    group by product_category_id \
    order by product_category_id")

    sqlContext.setConf("spark.sql.avro.compression.codec","Snappy")

    pd.write.format("com.databricks.spark.avro").save("/user/cloudera/problem2/products/result-df")

    ReplyDelete
  23. After seeing your article I want to say that the presentation is very good and also a well-written article with some very good information which is very useful for the readers....thanks for sharing it and do share more posts like this.

    angularjs Training in chennai
    angularjs Training in chennai

    angularjs-Training in tambaram

    angularjs-Training in sholinganallur

    angularjs-Training in velachery

    ReplyDelete
  24. Hi Arun,
    I am trying to save a DF to a AVRO file with snappy compression.However didn’t notice any size difference between avro file without snappy compression and with snappy compression.Also in your example file size is almost same after snappy compression.I am using the below codes…

    top5CustPerMonthMapSortedMapDF.coalesce(2).write.format(“com.databricks.spark.avro”).save("/user/sushital1997/DRPROB6/avro/top_5_cust")

    AVRO compression
    sqlContext.setConf(“spark.sql.avro.compression.codec”,“snappy”)
    top5CustPerMonthMapSortedMapDF.save("/user/sushital1997/DRPROB6/avro/top_5_cust1_snappy",“com.databricks.spark.avro”)

    I also tried with the below code
    top5CustPerMonthMapSortedMapDF.coalesce(2).write.format(“com.databricks.spark.avro”).save("/user/sushital1997/DRPROB6/avro/top_5_cust_snappy").

    I heard that snappy doesn’t work with DataFrame.So what I should do during certification exam if they ask to save a DF to avro with snappy compression.

    ReplyDelete
  25. Awesome! Education is the extreme motivation that open the new doors of data and material. So we always need to study around the things and the new part of educations with that we are not mindful.
    python training Course in chennai | python training in Bangalore | Python training institute in kalyan nagar

    ReplyDelete
  26. Well Said, you have furnished the right information that will be useful to anyone at all time. Thanks for sharing your Ideas.
    Data Science Training in Indira nagar
    Data Science training in marathahalli
    Data Science Interview questions and answers


    ReplyDelete
  27. Hi Arun, Thanks a lot for this Blog . I am trying to do this problem in pyspark. I am getting the following error. I am able to create dataframe but i am not able to use dataframe.show(). if i do so. i am getting below error.UnicodeEncodeError: 'ascii' codec can't encode character u'\xe9' in position 31: ordinal not in range(128)...i tried all options....but no use.. but RDD.take(n) is working.

    ReplyDelete
    Replies
    1. Hi siva,
      can you tell me for which line code you are getting the error??

      Delete
    2. In pyspark 2.4.6, the only work around I for it was to bring the data to the driver node and process the encoding in that node.

      I used the approach described on https://markhneedham.com/blog/2015/05/21/python-unicodeencodeerror-ascii-codec-cant-encode-character-uxfc-in-position-11-ordinal-not-in-range128/

      So the final result is:

      import sys
      reload(sys)
      sys.setdefaultencoding("utf-8")

      local = spark.read.jdbc("jdbc:mysql://mysql.XXXX.com.br",table="XXXX.products",\
      properties={"user":"XXXX","password":"XXXX","port":"3306"}).collect()
      local2 = map(list,local)
      local3=map(lambda r: ["|".join(map(str,r))],local2)
      sc.parallelize(local3,1).map(lambda x: x[0]).write.text("/user/marceltoledo/arun/cloudera/products")

      Delete
  28. hi arun i am getting error like this:

    var productsDF = products.map(x=> Product(x._1,x._2,x._3,x._4,x._5,x._6)).toDF();

    error: value map is not a member of object product

    ReplyDelete
  29. Just a suggestion : It may be good to post the result as well so that we all can verify. This is what I am getting. Is anyone else also getting this ? I want to know if my results are ok or if i need to fix anything

    +-------------------+-----------------+---------------------------+-----------------+-----------------+
    |product_category_id|max_product_price|total_products_per_category|avg_price_per_cat|min_price_per_cat|
    +-------------------+-----------------+---------------------------+-----------------+-----------------+
    | 59| 70.0| 10| 38.6| 28.0|
    | 58| 60.0| 13| 43.69| 22.0|
    | 57| 99.99| 18| 59.16| 0.0|
    | 56| 90.0| 22| 60.5| 9.99|
    | 55| 85.0| 24| 31.5| 9.99|
    | 54| 99.99| 18| 61.43| 34.99|
    | 53| 99.99| 8| 91.24| 69.99|
    | 52| 65.0| 19| 28.74| 10.0|
    | 51| 79.97| 10| 40.99| 28.0|
    | 50| 60.0| 14| 53.71| 34.0|
    | 49| 99.99| 13| 74.22| 19.98|
    | 48| 49.98| 7| 35.7| 19.98|
    | 47| 99.95| 14| 44.63| 21.99|
    | 46| 49.98| 9| 34.65| 19.98|
    | 45| 99.99| 7| 55.42| 27.99|
    | 44| 99.98| 15| 62.19| 21.99|
    | 43| 99.0| 1| 99.0| 99.0|
    | 42| 0.0| 1| 0.0| 0.0|
    | 41| 99.99| 37| 31.24| 9.59|
    | 40| 24.99| 24| 24.99| 24.99|
    | 39| 34.99| 12| 23.74| 19.99|
    | 38| 99.95| 14| 46.34| 19.99|
    | 37| 51.99| 24| 36.41| 4.99|
    | 36| 24.99| 24| 19.2| 12.99|
    | 35| 79.99| 9| 34.21| 9.99|
    | 34| 99.99| 9| 83.88| 34.99|
    | 33| 99.99| 19| 58.46| 10.8|
    | 32| 99.99| 10| 48.99| 19.99|
    | 31| 99.99| 7| 88.56| 79.99|
    | 30| 99.99| 7| 95.42| 68.0|
    | 29| 99.99| 15| 60.73| 4.99|
    | 27| 90.0| 24| 44.16| 18.0|
    | 26| 90.0| 24| 41.66| 18.0|
    etc etc

    ReplyDelete
  30. Can you show the rdd solution using pyspark ?

    ReplyDelete
  31. Attend The Python training in bangalore From ExcelR. Practical Python training in bangalore Sessions With Assured Placement Support From Experienced Faculty. ExcelR Offers The Python training in bangalore.
    python training in bangalore

    ReplyDelete
  32. Hi Arun,
    in the DF asnwer, why you have used CountDistinct('product_id) to find the no product's..... this is product master table... do we need to use that... please advise.

    ReplyDelete
  33. This is also a very good post which I really enjoyed reading. It is not Digital marketing in pune every day that I have the possibility to see something like this..

    ReplyDelete
  34. for problem 2 :

    why can't i use the below command to achieve?

    hadoop fs -cp /user/cloudera/products /user/cloudera/problem2/products
    hadoop fs -rm /user/cloudera/products

    ReplyDelete
  35. Attend The Artificial Intelligence Online courses From ExcelR. Practical Artificial Intelligence Online courses Sessions With Assured Placement Support From Experienced Faculty. ExcelR Offers The Artificial Intelligence Online courses.
    ExcelR Artificial Intelligence Online courses

    ReplyDelete
  36. Good blog, thanks for sharing this information.
    Spark Scala Training

    ReplyDelete
  37. Excellent post.
    Thank you.Keep updating more concepts onbig data hadoop course

    ReplyDelete
  38. I finally found great post here.I will get back here. I just added your blog to my bookmark sites. thanks.Quality posts is the crucial to invite the visitors to visit the web page, that's what this web page is providing. data science course in jaipur

    ReplyDelete
  39. I curious more interest in some of them hope you will give more information on this topics in your next articles. data science course jaipur

    ReplyDelete

If you have landed on this page then you are most likely aspiring to learn Hadoop ecosystem of technologies and tools. Why not make you...