Advertisement
MARSHAL327

BDOT LR4

Nov 20th, 2023
1,407
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 2.88 KB | None | 0 0
  1. package sevsu.spark
  2.  
  3. import java.nio.file.Paths
  4.  
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. import org.apache.spark.rdd.RDD
  7.  
  8. object Application {
  9.     private val conf: SparkConf = new SparkConf()
  10.         .setMaster("local[*]")
  11.         .setAppName("spark_example")
  12.         .set("spark.ui.showConsoleProgress", "false")
  13.  
  14.     private val sc: SparkContext = getSparkContext(conf)
  15.  
  16.     private val resourcesRoot: String = this.getClass.getResource("/").toString
  17.     private val personPath: String = resourcesRoot + "person.csv"
  18.     private val apartmentPath: String = resourcesRoot + "apartment.csv"
  19.  
  20.     case class Person(id: Int, name: String)
  21.  
  22.     case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String)
  23.  
  24.     def main(args: Array[String]): Unit = {
  25.         val rawPersonRdd: RDD[String] = sc.textFile(personPath)
  26.         val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath)
  27.         val persons = rawPersonRdd.map(strPerson => {
  28.             strPerson.split(",").map(_.trim) match {
  29.                 case Array(id, name) => Person(id.toInt, name)
  30.             }
  31.         })
  32.         val apartments = rawApartmentRdd.map(strPerson => {
  33.             strPerson.split(",").map(_.trim) match {
  34.                 case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address)
  35.             }
  36.         })
  37.  
  38.  
  39.         // ====== Task 1 ======
  40.         val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1))
  41.         val numPersonApartments = personCounter.reduceByKey((a, b) => a + b)
  42.         val numApartmentsPerson = numPersonApartments.map(_.swap).groupByKey().sortByKey()
  43.  
  44.          println(numApartmentsPerson.collect().mkString("\n"))
  45.  
  46.  
  47.         // ====== Task 2 ======
  48.         val personPairRDD = persons.map(item => (item.id, item.name))
  49.         val joined = personPairRDD join numPersonApartments
  50.         val numPersonNameApartments = joined.map(item => item._2)
  51.  
  52.          println(numPersonNameApartments.collect().mkString("\n"))
  53.  
  54.  
  55.         // ====== Task 3 ======
  56.         val personApartmentsAddress = apartments
  57.             .filter(_.num_rooms > 2)
  58.             .map(item =>
  59.                 (item.id_human, item.address)
  60.             )
  61.         val personNameAddressWithId = personApartmentsAddress join personPairRDD
  62.         val personNameAddress = personNameAddressWithId.map(item => item._2)
  63.  
  64.         println(personNameAddress.collect().mkString("\n"))
  65.  
  66.  
  67.         sc.stop()
  68.     }
  69.  
  70.     private def getSparkContext(conf: SparkConf): SparkContext = {
  71.         if (System.getProperty("os.name").toLowerCase.contains("windows")) {
  72.             System.setProperty(
  73.                 "hadoop.home.dir",
  74.                 Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString
  75.             )
  76.         }
  77.  
  78.         new SparkContext(conf)
  79.     }
  80. }
  81.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement