Problem Scenario 4

PLEASE READ THE INTRODUCTION TO THIS SERIES. CLICK ON HOME LINK AND READ THE INTRO BEFORE ATTEMPTING TO SOLVE THE PROBLEMS

Video walk-through of the solution to this problem can be found here [Click here]

Click here for the video version of this series. This takes you to the youtube playlist of videos. 

In this problem, we will focus on conversion between different file formats using spark or hive. This is a very import examination topic. I recommend that you master the data file conversion techniques and understand the limitations. You should have an alternate method of accomplishing a solution to the problem in case your primary method fails for any unknown reason. For example, if saving the result as a text file with snappy compression fails while using spark then you should be able to accomplish the same thing using Hive. In this blog\video I am going to walk you through some scenarios that cover alternative ways of dealing with same problem.    

Problem 4:
  1. Import orders table from mysql as text file to the destination /user/cloudera/problem5/text. Fields should be terminated by a tab character ("\t") character and lines should be terminated by new line character ("\n"). 
  2. Import orders table from mysql  into hdfs to the destination /user/cloudera/problem5/avro. File should be stored as avro file.
  3. Import orders table from mysql  into hdfs  to folders /user/cloudera/problem5/parquet. File should be stored as parquet file.
  4. Transform/Convert data-files at /user/cloudera/problem5/avro and store the converted file at the following locations and file formats
    • save the data to hdfs using snappy compression as parquet file at /user/cloudera/problem5/parquet-snappy-compress
    • save the data to hdfs using gzip compression as text file at /user/cloudera/problem5/text-gzip-compress
    • save the data to hdfs using no compression as sequence file at /user/cloudera/problem5/sequence
    • save the data to hdfs using snappy compression as text file at /user/cloudera/problem5/text-snappy-compress
  5. Transform/Convert data-files at /user/cloudera/problem5/parquet-snappy-compress and store the converted file at the following locations and file formats
    • save the data to hdfs using no compression as parquet file at /user/cloudera/problem5/parquet-no-compress
    • save the data to hdfs using snappy compression as avro file at /user/cloudera/problem5/avro-snappy
  6. Transform/Convert data-files at /user/cloudera/problem5/avro-snappy and store the converted file at the following locations and file formats
    • save the data to hdfs using no compression as json file at /user/cloudera/problem5/json-no-compress
    • save the data to hdfs using gzip compression as json file at /user/cloudera/problem5/json-gzip
  7. Transform/Convert data-files at  /user/cloudera/problem5/json-gzip and store the converted file at the following locations and file formats
    • save the data to as comma separated text using gzip compression at   /user/cloudera/problem5/csv-gzip
  8. Using spark access data at /user/cloudera/problem5/sequence and stored it back to hdfs using no compression as ORC file to HDFS to destination /user/cloudera/problem5/orc 

Solution: 

Try your best to solve the above scenario without going through the solution below. If you could then use the solution to compare your result. If you could not then I strongly recommend that you go through the concepts again (this time in more depth). Each step below provides a solution to the points mentioned in the Problem Scenario. 

Step 1: 

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --password cloudera --username retail_dba --table orders --as-textfile --fields-terminated-by '\t' --target-dir /user/cloudera/problem5/text -m 1

Step 2: 

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --password cloudera --username retail_dba --table orders --as-avrodatafile--target-dir /user/cloudera/problem5/avro -m 1

Step 3: 

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --password cloudera --username retail_dba --table orders --as-parquetfile --target-dir /user/cloudera/problem5/parquet -m 1

Step 4: 

var dataFile = sqlContext.read.avro("/user/cloudera/problem5/avro");
sqlContext.setConf("spark.sql.parquet.compression.codec","snappy");
dataFile.repartition(1).write.parquet("/user/cloudera/problem5/parquet-snappy-compress");
dataFile.map(x=> x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3)).saveAsTextFile("/user/cloudera/problem5/text-gzip-compress",classOf[org.apache.hadoop.io.compress.GzipCodec]);
dataFile.map(x=> (x(0).toString,x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3))).saveAsSequenceFile("/user/cloudera/problem5/sequence");

Below may fail in some cloudera VMS. If the spark command fails use the sqoop command to accomplish the problem. Remember you need to get out to spark shell to run the sqoop command. 

dataFile.map(x=> x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3)).saveAsTextFile("/user/cloudera/problem5/text-snappy-compress",classOf[org.apache.hadoop.io.compress.SnappyCodec]);

sqoop import --table orders --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username retail_dba --password cloudera --as-textfile -m1 --target-dir user/cloudera/problem5/text-snappy-compress --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec

Step 5: 

var parquetDataFile = sqlContext.read.parquet("/user/cloudera/problem5/parquet-snappy-compress")
sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed");
parquetDataFile.write.parquet("/user/cloudera/problem5/parquet-no-compress");
sqlContext.setConf("spark.sql.avro.compression.codec","snappy");

parquetDataFile.write.avro("/user/cloudera/problem5/avro-snappy");

Step 6: 

var avroData = sqlContext.read.avro("/user/cloudera/problem5/avro-snappy");
avroData.toJSON.saveAsTextFile("/user/cloudera/problem5/json-no-compress");
avroData.toJSON.saveAsTextFile("/user/cloudera/problem5/json-gzip",classOf[org.apache.hadoop.io.GzipCodec]);

Step 7 : 

var jsonData = sqlContext.read.json("/user/cloudera/problem5/json-gzip");
jsonData.map(x=>x(0)+","+x(1)+","+x(2)+","+x(3)).saveAsTextFile("/user/cloudera/problem5/csv-gzip",classOf[org.apache.hadoop.io.compress.GzipCodec])

Step 8: 

//To read the sequence file you need to understand the sequence getter for the key and value class to //be used while loading the sequence file as a spark RDD.
//In a new terminal Get the Sequence file to local file system
hadoop fs -get /user/cloudera/problem5/sequence/part-00000
//read the first 300 characters to understand the two classes to be used. 
cut -c-300 part-00000

//In spark shell do below
var seqData = sc.sequenceFile("/user/cloudera/problem5/sequence/",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]);
seqData.map(x=>{var d = x._2.toString.split("\t"); (d(0),d(1),d(2),d(3))}).toDF().write.orc("/user/cloudera/problem5/orc");

Contribution from Raphael L. Nascimento:

You can use below method as well for setting the compression codec.
sqlContext.sql("SET spark.sql.parquet.compression.codec=snappy")

74 comments:

  1. Hi Arun, firstly I'd like to thank you for your great work here, these problems are very helpful.

    I have one question:

    After set the compress for snappy using (sqlContext.setConf("spark.sql.parquet.compress.codec","snappy")).

    And writing the content as parquet (orders.write.parquet("/user/cloudera/problem4/parquet-snappy-compress")).

    It seems the content was stored with default compression (gz).

    hdfs dfs -ls /user/cloudera/problem4/parquet-snappy-compress

    part-r-00009-681f9ff9-1709-4513-bec7-01787ff85b09.gz.parquet


    Did you face the same issue ?

    I currently using cloudera-quickstart-vm-5.8.0-0, spark version 1.6.0.


    Best Regards
    Raphaeel Nascimento

    ReplyDelete
    Replies
    1. Hi Raphael. You are facing the issue because of mis spelled word. the key is spark.sql.parquet.compression.codec and not spark.sql.parquet.compress.codec. So changing from compress to compression should solve your problem. Also, i recommend that you remember these names and values of the keys as they come in very handy during the exam.

      Thank you for the nice complement though.

      Delete
    2. I'm so sorry, thank you. There is so many packages to remember and configs, I think I got confused with the package name of the compression codecs, which use the word "compress".

      Tks.

      Delete
    3. No problem. BTW, why did you delete your comment. The alternate procedure you showed is also very valid. May be you activate your comment again so that the views can benefit from using both the ways. I am giving credit to you and posting your way as well in a few minutes. Check the blog in 10 minutes or so you see your procedure as well reflected in the solution section

      Delete
  2. This comment has been removed by the author.

    ReplyDelete
  3. Besides the fact I made the confusion with compress word, there is a alternative way to set the compression codec:

    sqlContext.sql("SET spark.sql.parquet.compression.codec=snappy")


    Regards
    Raphael.

    ReplyDelete
  4. Another approach to save time in test to transform DF in RDD is (depends on the number of dataframe' columns):

    Use the concat_ws sql function.

    Pyhton:

    from pyspark.sql.functions import *

    orders.select(concat_ws("\t", *orders.columns)).map(lambda rec: (rec[0])).saveAsTextFile("path", "compress-codec")

    orders.select(orders[0], concat_ws("\t", *orders.columns)).map(lambda rec: (int(rec[0]), rec[1])).saveAsSequenceFile("path", "compress-codec")

    Regards
    Raphael.

    ReplyDelete
  5. Hi Arun,

    Thanks for the blog.
    How is an ORC file saved with compression ?

    Is this valid ?
    sqlContext.setConf("spark.sql.orc.compression.codec","SomeCodec")
    SomeDF.write.orc("somePath")

    It does write the file in ORC format but the data seems to be in uncompressed format without any compression codec extension.

    Can you please clarify this ?


    Regards,


    ReplyDelete
    Replies
    1. there is no configuration called spark.sql.orc.compression.codec that i am aware of

      instead of below

      sqlContext.setConf("spark.sql.orc.compression.codec","SomeCodec")

      Can you try below instead and let me know?

      sqlContext.setConf("spark.io.compression.codec","snappy")

      Delete
    2. Thanks for the suggestion, but the compression wasn't applied i guess.
      Please see the hdfs file size with and without snappy compression.


      [cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/CCApractice/problem4/orc_snappy_arun
      Found 2 items
      -rw-r--r-- 1 cloudera cloudera 0 2017-05-25 21:32 /user/cloudera/CCApractice/problem4/orc_snappy_arun/_SUCCESS
      -rw-r--r-- 1 cloudera cloudera 306065 2017-05-25 21:32 /user/cloudera/CCApractice/problem4/orc_snappy_arun/part-r-00000-3fe70373-dd2c-414f-80e6-5f2f9c300fc9.orc
      [cloudera@quickstart ~]$
      [cloudera@quickstart ~]$
      [cloudera@quickstart ~]$ hadoop fs -ls /user/cloudera/CCApractice/problem4/orc
      Found 2 items
      -rw-r--r-- 1 cloudera cloudera 0 2017-05-19 15:16 /user/cloudera/CCApractice/problem4/orc/_SUCCESS
      -rw-r--r-- 1 cloudera cloudera 306065 2017-05-19 15:16 /user/cloudera/CCApractice/problem4/orc/part-r-00000-b86a666c-9ad2-4ca9-b82f-f917089a7660.orc
      [cloudera@quickstart ~]$

      Delete
    3. In ORC compression if we don't specify any compression codec than by default it will get SnappyCompression as per given below code of apache-spark (2.X) version https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala

      Delete
    4. I applied the above steps mentioned by Arun and laxman and in both cases my files were not compressed.
      Is there any other way of compressing the orc file data

      sqlContext.setConf("spark.io.compression.codec","gzip")
      avroDF.write.format("orc").save("/user/cloudera/testorc4")


      Delete
    5. I tried to compress orc with snappy with below code and it worked for me.
      SomeDF.write.orc("somePath", compression = 'snappy')

      Delete
  6. Can you explain me why in (K,V) pair, you cast as string instead of Integer ?
    dataFile.map(x=> (x(0).toString,x(0)+"\t"+x(1)+"\t"+x(2)+"\t"+x(3))).saveAsSequenceFile("/user/cloudera/problem5/sequence");

    ReplyDelete
    Replies
    1. Because sequence file needs a RDD in form of (K,V) pair, have a look at its function description in spark documentation.

      Delete
    2. we could have used "," also as seperator, just for confirmation

      Delete
  7. Hi Arun,

    Thanks for the consolidated material, this is a real confidence booster.

    I have a small doubt:
    In step 6, I'm trying the following solution to write the compressed json file:
    val avroDF = sqlContext.read("/user/cloudera/problem5/avro-snappy")
    sqlContext.setConf("set spark.sql.json.compression.codec", "Gzip");
    avroDF.write.json("/user/cloudera/problem5/json-compress")

    It is not compressing the json file . Do you see any error in the above written commands ?

    ReplyDelete
    Replies
    1. Try avroDF.toJSON.saveAsTextFile("/user/cloudera/problem5/json-gzip",classOf[org.apache.hadoop.io.compress.GzipCodec])

      Worked for me!

      Delete
    2. I tried the same in python, but it is saving in textfile in gzip format. any solution?

      Delete
    3. You can try below
      avroDF.write.json("/user/cloudera/problem5/json-compress", compression = 'gzip')

      Delete
  8. Please take a look at this link where I consolidated all file handling mechanisms. Jason compression is also noted here

    http://arun-teaches-u-tech.blogspot.com/p/file-formats.html?m=1


    ReplyDelete
  9. Hi Arun,

    You blog is just amazing! I am finding it really useful to study and practice everything in one place.

    Step8 of this problem in which we have to store the result in ORC.. I get this error...any help is appreciated.

    scala> ordersSeq.map(r => { var a = r._2.toString.split("\t") ; (a(0),a(1),a(2),a(3))}).toDF().write.orc("/user/cloudera/problem4/orc")
    java.lang.AssertionError: assertion failed: The ORC data source can only be used with HiveContext.
    at scala.Predef$.assert(Predef.scala:179)

    Are you planning to come up with any more videos/problem scenarios in this series?

    ReplyDelete
    Replies
    1. Please cahnge delimitter to.split('\t') it worked for me

      Delete
  10. Hi Arun,

    Any idea how we can convert a csv to a tab delimited format?

    ReplyDelete
    Replies
    1. Read as text, split every record with ',', and then map each records elements by concatenating with a tab. store the file as text

      Delete
  11. This comment has been removed by the author.

    ReplyDelete
  12. Hi Arun,
    I am unable to save ORC files in compressed formats.
    I have tried the following

    sqlContext.setConf("orc.compress","SNAPPY")
    avroData.write.orc("/user/cloudera/test/orc-snappy")

    Can you please help me out with this ?

    ReplyDelete
  13. As you have told that we can get result from any format and the result is important right! ---- can i do that all compressions from sqoop instead of spark as it is much easy from sqoop compare to spark

    ReplyDelete
  14. This comment has been removed by the author.

    ReplyDelete
  15. This comment has been removed by the author.

    ReplyDelete
  16. I would like to thank Arun for his great effort. This is very useful to prepare CCA 175.
    Today I cleared CCA 175 with 8 out of 9 questions.
    This playlist gave me immense confidence to clear CCA 175 .
    Thanks again Arun!!

    ReplyDelete
  17. Hello Arun. Thanks very much for the content and efforts that you have put-in for benefit of aspirants like me. One query for problem scenario 4 - step 4 - item a - is it sqlContext.setConf("spark.sql.parquet.compression.codec","snappy"); or sqlContext.setConf("spark.sql.parquet.compress.codec","snappy"); As per blog it is compression.codec and as per video it is compress.codec and i tried both, the parquet file with snappy compression of size 270k gets generated when we use compress.codec, however when we use compression.codec, the file still remains of 476k. Please advise. Thanks.

    ReplyDelete
  18. Hi,

    step 5: converting parquet to avro with snappy compression

    I did the following,
    spark-shell --packages com.databricks:spark-avro_2.10:2.0.1 --master yarn --conf spark.ui.port=12345
    import com.databricks.spark.avro._
    val parquetSnappy= sqlContext.read.parquet("/user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy")
    sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
    parquetSnappy.write.avro("/user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1")

    it runs successfully but
    in that file location
    [rkathiravan@gw01 ~]$ hadoop fs -ls /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1
    Found 5 items
    -rw-r--r-- 3 rkathiravan hdfs 0 2017-09-21 15:27 /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1/_SUCCESS
    -rw-r--r-- 3 rkathiravan hdfs 195253 2017-09-21 15:27 /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1/part-r-00000-17e6c260-7622-41ba-b715-a3d3df2846e0.avro
    -rw-r--r-- 3 rkathiravan hdfs 169247 2017-09-21 15:27 /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1/part-r-00001-17e6c260-7622-41ba-b715-a3d3df2846e0.avro
    -rw-r--r-- 3 rkathiravan hdfs 193189 2017-09-21 15:27 /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1/part-r-00002-17e6c260-7622-41ba-b715-a3d3df2846e0.avro
    -rw-r--r-- 3 rkathiravan hdfs 172071 2017-09-21 15:27 /user/rkathiravan/fileformats/ex/ord_avro_parquet_snappy_to_avro_snappy1/part-r-00003-17e6c260-7622-41ba-b715-a3d3df2846e0.avro

    i dont see whether it is snappy compressed .
    can anyone help me out.

    ReplyDelete
    Replies
    1. Snappy Compressed AVRO file does not have a "snappy" in its file name. You can compare the file size of the uncompressed avro file and confirm if your compression has worked fine or it. You can also refer to Arun's video.

      Delete
    2. This comment has been removed by the author.

      Delete
    3. you can check the header of an output file.

      [cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/file_formats/spark/avro_snappy/part-r-00000-9eb11396-9935-4480-bca5-8f027f14b9be.avro | head
      Obj avro.codec
      snappy avro.schema� {"type":"record","name":"topLevelRecord","fields":[{"name":"order_idf","type":["string","null"]},{"name":"order_datef","type":["string","null"]},{"name":"order_cidf","type":["string","null"]},{"name":"order_statusf","type":["string","null"]}]}


      you can verify compression technique from here : "Obj avro.codec
      snappy "

      Delete
  19. Hi Arun, Is there any specific reason you used 1 mapper in all your sqoop imports

    ReplyDelete
  20. Thanks for sharing such details about bigdata and hadoop. Bigdata in hadoop is the interseting topic and to get some important information.On Big Data Hadoop Online Training Bangalore

    ReplyDelete
  21. Hi Arun Sir, I passed(8/9) CCA175 on 05FEB2018. This particular problem is my favorite of all. Your questions helped to take exam confidently. Thanks a lot for helping many like me.

    ReplyDelete
  22. Hi Arun

    Mate, Solution to Problem 7 seems incorrect. We are trying to convert JSON to CSV.
    So our Dataframe will have data like this.

    scala> jsonData.take(2)
    res12: Array[String] = Array({"order_id":1,"order_date":1374735600000,"order_customer_id":11599,"order_status":"CLOSED"}, {"order_id":2,"order_date":1374735600000,"order_customer_id":256,"order_status":"PENDING_PAYMENT"})

    Now

    jsonData.map(x=>x(0)+","+x(1)+","+x(2)+","+x(3)).saveAsTextFile("/user/cloudera/problem5/csv-gzip",classOf[org.apache.hadoop.io.compress.GzipCodec])

    will actually save this data {,",o,r in to CSV before zipping.

    Instead we should use

    df.write.format("com.databricks.spark.csv").option("header", "true").save(blah blah blah)

    Plz correct me if I am wrong.


    ReplyDelete
    Replies
    1. Did you have to make spark csv available using
      spark-shell --packages com.databricks:spark-csv_2.11:1.5.0?
      Will this be available in the exam environment?

      Delete
    2. Its already imported...you don't need...start shell simply

      Delete
  23. Hi Arun,
    Although it was not asked in your exercise but I am facing issue in saving Dataframe (from Avro Data) as TextFile with Snappy Compression. Below is what i used :-


    ordersDf= sqlContext.read.load("/user/cloudera/problem5/avro" , format="com.databricks.spark.avro")
    codec = "org.apache.hadoop.io.compress.SnappyCodec"
    ordersDf.map(lambda rec : ( str(rec[0]) + "\t" + str(rec[1]) + "\t" + str(rec[2]) + "\t" + str(rec[3]) )).saveAsTextFile("/user/cloudera/problem5/text-snappy-compress",codec)

    Cloudera Quickstart VM CDH 12

    Above works without any issue for Gzip and BZip2 but not for Snappy . What could be the reason for same ?


    ReplyDelete
  24. It is very excellent blog and useful article thank you for sharing with us , keep posting learn more about Big Data Hadoop important information thank you providing this important information on
    Big Data Hadoop Online Training Hyderabad

    ReplyDelete
    Replies
    1. In this problem, we will focus on conversion between different file formats using spark or hive. This is a very import examination topic. I recommend that you master the data file conversion techniques and understand the limitations. You should have an alternate method of accomplishing a solution to the problem in case your primary method fails for any unknown reason. For example, if saving the result as a text file with snappy compression fails while using spark then you should be able to accomplish the same thing using Hive. In this blog\video I am going to walk you through some scenarios that cover alternative ways of dealing with same problem.


      Deep Learning Projects for Final Year

      Big Data Projects For Final Year Students

      Cloud Computing Projects Final Year Projects

      Delete
  25. In the exam, is it possible to load com.databricks.spark.csv? In my current setup i assume it is being loaded over http from maven as I have to run spark shell with
    Spark-shell --packages com.databricks:spark-csv_2.11:1.5.0

    ReplyDelete
  26. Hi Arun,

    When you did sqoop import of orders table as avrodatafile, order_date column lost its date format, which is visible when you converted avro file to gzip compressed text file. In fact if you read the parquet file, it has same problem. Is it okay to save order_date in unix format ?

    ReplyDelete
  27. This comment has been removed by the author.

    ReplyDelete
  28. This comment has been removed by the author.

    ReplyDelete
  29. It was really a nice article and i was really impressed by reading thisBig Data Hadoop Online Course Bnagalore

    ReplyDelete
  30. May be this helps to some one .saving avro in snappy compression won't save the file as filename.snappy.avro instead it saves the file as filename.avro with less size.

    ReplyDelete
    Replies
    1. I made this mistake in exam which I took today

      Delete
  31. I think step 6 is not appropriate to save a file in Json format .The given code saves it as a textfile with Json data.I believe the write way is avroData.write.json("destination folder")

    ReplyDelete
  32. Hi,
    In couple of questions, you did mentioned --as-textfile to produce output in form of text file. I believe in sqoop textfile is by default format. So do we still need to mention it?

    ReplyDelete
  33. This comment has been removed by the author.

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
  34. @arun Is there any reason why you are not using mkString for each record in map as below?

    json_orders.map(rec => rec.mkString(",")).saveAsTextFile...

    ReplyDelete
  35. Good Post! Thank you so much for sharing this pretty post, it was so good to read and useful to improve my knowledge as updated one, keep blogging.

    Big Data Hadoop Training in electronic city, Bangalore | #Big Data Hadoop Training in electronic city, Bangalore

    ReplyDelete
  36. Big data in hadoop is the interesting topic and to get some important information.Big data hadoop online Training

    ReplyDelete
  37. All Pyspark answers Part -1

    save the data to hdfs using snappy compression as parquet file at /user/cloudera/problem5/parquet-snappy-compress
    save the data to hdfs using gzip compression as text file at /user/cloudera/problem5/text-gzip-compress
    save the data to hdfs using no compression as sequence file at /user/cloudera/problem5/sequence
    save the data to hdfs using snappy compression as text file at /user/cloudera/problem5/text-snappy-compress

    adf=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem5/avro/orders")

    sqlContext.setConf("spark.sql.parquet.compression.codec","snappy")
    adf.write.parquet("/user/cloudera/problem5/parquet-snappy-compress")

    tdf1=adf.rdd.map(list)
    tdf1.map(lambda x:str(x[0])+"\t"+str(x[1])+"\t"+str(x[2])+"\t"+str(x[3])).saveAsTextFile("/user/cloudera/problem5/text-gzip-compress","org.apache.hadoop.io.compress.GzipCodec")

    sqlContext.setConf("spark.sql.sequence.compression.codec","uncompressed")
    sdf1=adf.rdd.map(list)
    sdf1.map(lambda x:(str(x[0]),str(x[0])+"\t"+str(x[1])+"\t"+str(x[2])+"\t"+str(x[3]))).saveAsSequenceFile("/user/cloudera/problem5/sequence")

    sqlContext.setConf("spark.sql.text.compression.codec","snappy")
    tdf1=adf.rdd.map(list)
    **************Below didn't work on my Cloudera VM**********************************
    tdf1.map(lambda x:str(x[0])+"\t"+str(x[1])+"\t"+str(x[2])+"\t"+str(x[3])).saveAsTextFile("/user/cloudera/problem5/text-snappy-compress","org.apache.hadoop.io.compress.SnappyCodec")

    ReplyDelete
  38. Part-2

    Transform/Convert data-files at /user/cloudera/problem5/parquet-snappy-compress and store the converted file at the following locations and file formats
    save the data to hdfs using no compression as parquet file at /user/cloudera/problem5/parquet-no-compress
    save the data to hdfs using snappy compression as avro file at /user/cloudera/problem5/avro-snappy

    pdf=sqlContext.read.parquet("/user/cloudera/problem5/parquet-snappy-compress")

    sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed")
    pdf.write.parquet("/user/cloudera/problem5/parquet-no-compress")

    sqlContext.setConf("spark.sql.avro.compression.codec","snappy")
    pdf.write.format("com.databricks.spark.avro").save("/user/cloudera/problem5/avro-snappy")


    Transform/Convert data-files at /user/cloudera/problem5/avro-snappy and store the converted file at the following locations and file formats
    save the data to hdfs using no compression as json file at /user/cloudera/problem5/json-no-compress
    save the data to hdfs using gzip compression as json file at /user/cloudera/problem5/json-gzip

    adf=sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem5/avro-snappy")

    sqlContext.setConf("spark.sql.json.compression.codec","uncompressed")
    adf.write.json("/user/cloudera/problem5/json-no-compress")

    adf.toJSON().saveAsTextFile("/user/cloudera/problem5/json-gzip","org.apache.hadoop.io.compress.GzipCodec")

    Transform/Convert data-files at /user/cloudera/problem5/json-gzip and store the converted file at the following locations and file formats
    save the data to as comma separated text using gzip compression at /user/cloudera/problem5/csv-gzip

    jdf=sqlContext.read.json("/user/cloudera/problem5/json-gzip")

    tdf=jdf.rdd.map(list)
    tdf.map(lambda x:str(x[0])+"\t"+str(x[1])+"\t"+str(x[2])+"\t"+str(x[3])).saveAsTextFile("/user/cloudera/problem5/csv-gzip","org.apache.hadoop.io.compress.GzipCodec")


    Using spark access data at /user/cloudera/problem5/sequence and stored it back to hdfs using no compression as ORC file to HDFS to destination /user/cloudera/problem5/orc

    srdd=sc.sequenceFile("/user/cloudera/problem5/sequence","org.apache.hadoop.io.Text","org.apache.hadoop.io.Text")
    srddt=srdd.map(lambda x:x[1])
    from pyspark.sql import Row
    sdf1=srddt.map(lambda x:Row(order_id=x.split("\t")[0],order_date=x.split("\t")[1],order_customer_id=x.split("\t")[2],order_status=x.split("\t")[3])).toDF()
    sdf1.write.orc("/user/cloudera/problem5/orc")

    ReplyDelete
  39. Thank you.Well it was nice post and very helpful information on Big Data Hadoop Online Course Bangalore

    ReplyDelete
  40. Hi Arun

    Thanks for such a informative and useful blog.
    I have a query:
    can we use below mentioned method for compression?
    joinedDF.write.option("codec", "org.apache.hadoop.io.compress.GzipCodec").parquet("/user/saurabhtaksali/problem1/result4b-gzip")

    ReplyDelete
  41. Sqoop import - If question doesn't say about --num-mappers 1 or to store as 1 file then should we do it? Will the software that will check for results check for part files or are we supposed to save as single file always?

    ReplyDelete
  42. hi arun
    in sub problem 6 we read the file in compressed format, ahile saving it into json don't we need to set the conf as uncompressed?

    because above problem you did like uncompressed right? why you didn't do in writing into json files?

    ReplyDelete
  43. Hello arun,
    Thanks for ur blog.I just want to understand while saving as text file from spark can i use spark2-shell with databrics csv with seperator as tab?

    and in CCA 175 exam do i need to write back the header in the solution file also. can you please clarify above ones.. Thanks in advance

    ReplyDelete
  44. Hi Arun,

    1) In certification test, Which method do they ask mostly to solve their big data problems in from RDD, Dataframe and spark sql?
    2) Which one is most performance-based efficient among RDD, Dataframe and spark sql?

    ReplyDelete
  45. Good blog, thanks for sharing this information.
    Spark Scala Training

    ReplyDelete
  46. Very nice post,keep sharing more post.
    thank you for info...

    big data online training

    ReplyDelete

  47. Quick up for the best offer of AWS DevOps Training in Chennai from Infycle Technologies, Excellent software training in Chennai. A massive place to learn other technical courses like Power BI, Cyber Security, Graphic Design and Animation, Block Security, Java, Oracle, Python, Big data, Azure, Python, Manual and Automation Testing, DevOps, Medical Coding etc., with outstanding training with experienced trainers with a fresh environment with 100+ Live Practical Sessions and Real-Time scenario after the finalisation of the course the trainee will able to get through the interview in top MNC’s with an amazing package for more enquiry approach us on 7504633633, 7502633633.

    ReplyDelete
  48. Its Valuable information for your blog. Thanks for giving me the wonderful opportunity to read this valuable article.
    Traffic Lawyers Near Me Virginia
    Child Custody Lawyers Near Me Virginia
    family law attorney near me in Virginia

    ReplyDelete

If you have landed on this page then you are most likely aspiring to learn Hadoop ecosystem of technologies and tools. Why not make you...