Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package sevsu.spark
- import java.nio.file.Paths
- import org.apache.spark.{SparkConf, SparkContext}
- import org.apache.spark.rdd.RDD
- object Application {
- private val conf: SparkConf = new SparkConf()
- .setMaster("local[*]")
- .setAppName("spark_example")
- .set("spark.ui.showConsoleProgress", "false")
- private val sc: SparkContext = getSparkContext(conf)
- private val resourcesRoot: String = this.getClass.getResource("/").toString
- private val personPath: String = resourcesRoot + "person.csv"
- private val apartmentPath: String = resourcesRoot + "apartment.csv"
- case class Person(id: Int, name: String)
- case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String)
- def main(args: Array[String]): Unit = {
- val rawPersonRdd: RDD[String] = sc.textFile(personPath)
- val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath)
- val persons = rawPersonRdd.map(strPerson => {
- strPerson.split(",").map(_.trim) match {
- case Array(id, name) => Person(id.toInt, name)
- }
- })
- val apartments = rawApartmentRdd.map(strPerson => {
- strPerson.split(",").map(_.trim) match {
- case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address)
- }
- })
- // ====== Task 1 ======
- val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1))
- val numPersonApartments = personCounter.reduceByKey((a, b) => a + b)
- val numApartmentsPerson = numPersonApartments.map(_.swap).groupByKey().sortByKey()
- println(numApartmentsPerson.collect().mkString("\n"))
- // ====== Task 2 ======
- val personPairRDD = persons.map(item => (item.id, item.name))
- val joined = personPairRDD join numPersonApartments
- val numPersonNameApartments = joined.map(item => item._2)
- println(numPersonNameApartments.collect().mkString("\n"))
- // ====== Task 3 ======
- val personApartmentsAddress = apartments
- .filter(_.num_rooms > 2)
- .map(item =>
- (item.id_human, item.address)
- )
- val personNameAddressWithId = personApartmentsAddress join personPairRDD
- val personNameAddress = personNameAddressWithId.map(item => item._2)
- println(personNameAddress.collect().mkString("\n"))
- sc.stop()
- }
- private def getSparkContext(conf: SparkConf): SparkContext = {
- if (System.getProperty("os.name").toLowerCase.contains("windows")) {
- System.setProperty(
- "hadoop.home.dir",
- Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString
- )
- }
- new SparkContext(conf)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement