Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package sevsu.spark
- import java.nio.file.Paths
- import org.apache.spark.{RangePartitioner, SparkConf, SparkContext}
- import org.apache.spark.rdd.RDD
- import sevsu.spark.Application.time
- object Application {
- private val conf: SparkConf = new SparkConf()
- .setMaster("local[*]")
- .setAppName("spark_example")
- .set("spark.ui.showConsoleProgress", "false")
- private val sc: SparkContext = getSparkContext(conf)
- private val resourcesRoot: String = this.getClass.getResource("/").toString
- private val personPath: String = resourcesRoot + "person.csv"
- private val apartmentPath: String = resourcesRoot + "apartment.csv"
- case class Person(id: Int, name: String)
- case class Apartment(id_apartment: Int, id_human: Int, num_rooms: Int, address: String)
- def time[R](block: => R): R = {
- val t0 = System.nanoTime()
- val result = block // call-by-name
- val t1 = System.nanoTime()
- val resultTime = BigDecimal((t1 - t0) / Math.pow(10, 9))
- .setScale(4, BigDecimal.RoundingMode.HALF_UP)
- println("Время: " + resultTime + " сек.")
- result
- }
- def lab4(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
- println("========== lab4 ==========")
- // ====== Task 1 ======
- val personCounter: RDD[(Int, Int)] = apartments.map(item => (item.id_human, 1))
- val numPersonApartments =
- personCounter
- .reduceByKey((a, b) => a + b)
- .persist()
- val numApartmentsPerson =
- numPersonApartments.map(_.swap)
- .groupByKey()
- .sortByKey()
- if (printResults) {
- println(numApartmentsPerson.collect().mkString("\n"))
- }
- print("1 задание - ")
- time(numApartmentsPerson.collect())
- // ====== Task 2 ======
- val personPairRDD = persons.map(item => (item.id, item.name))
- val joined = personPairRDD join numPersonApartments
- val numPersonNameApartments = joined.map(item => item._2)
- if (printResults) {
- println(numPersonNameApartments.collect().mkString("\n"))
- }
- print("2 задание - ")
- time(numPersonNameApartments.collect())
- // ====== Task 3 ======
- val personApartmentsAddress = apartments
- .filter(_.num_rooms > 2)
- .map(item =>
- (item.id_human, item.address)
- )
- val personNameAddressWithId = personApartmentsAddress join personPairRDD
- val personNameAddress = personNameAddressWithId.map(item => item._2)
- if (printResults) {
- println(personNameAddress.collect().mkString("\n"))
- }
- print("3 задание - ")
- time(personNameAddress.collect())
- }
- def lab5(apartments: RDD[Apartment], persons: RDD[Person], printResults: Boolean = true): Unit = {
- println("\n========== lab5 ==========")
- // ====== Task 1 ======
- val pairApartments = apartments.map(apartment => (apartment.id_human, 1))
- val partitioner = new RangePartitioner(4, pairApartments)
- val partitionedPairApartments = pairApartments.partitionBy(partitioner).persist()
- val numPersonApartments =
- partitionedPairApartments
- .reduceByKey((a, b) => a + b)
- .persist()
- val numApartmentsPerson =
- numPersonApartments.map(_.swap)
- .groupByKey()
- .sortByKey()
- if (printResults) {
- println(numApartmentsPerson.collect().mkString("\n"))
- }
- print("1 задание - ")
- time(numApartmentsPerson.collect())
- // ====== Task 2 ======
- val personPairRDD = persons.map(item => (item.id, item.name))
- val partitionerPersonPairRDD = new RangePartitioner(4, personPairRDD)
- val partitionedPersonPairRDD = personPairRDD.partitionBy(partitionerPersonPairRDD).persist()
- val joined: RDD[(Int, (String, Int))] = partitionedPersonPairRDD join numPersonApartments
- val numPersonNameApartments = joined.mapValues(item => item._1)
- if (printResults) {
- println(numPersonNameApartments.collect().mkString("\n"))
- }
- print("2 задание - ")
- time(numPersonNameApartments.collect())
- // ====== Task 3 ======
- val personApartmentsAddress = apartments
- .filter(_.num_rooms > 2)
- .map(item =>
- (item.id_human, item.address)
- )
- val partitionerPersonApartmentsAddress = new RangePartitioner(4, personApartmentsAddress)
- val partitionedPersonApartmentsAddress = personApartmentsAddress.partitionBy(partitionerPersonApartmentsAddress).persist()
- val personNameAddressWithId = partitionedPersonApartmentsAddress join personPairRDD
- val personNameAddress = personNameAddressWithId.mapValues(item => item._1)
- if (printResults) {
- println(personNameAddress.collect().mkString("\n"))
- }
- print("3 задание - ")
- time(personNameAddress.collect())
- }
- def main(args: Array[String]): Unit = {
- val rawPersonRdd: RDD[String] = sc.textFile(personPath)
- val rawApartmentRdd: RDD[String] = sc.textFile(apartmentPath)
- val persons: RDD[Person] = rawPersonRdd.map(strPerson => {
- strPerson.split(",").map(_.trim) match {
- case Array(id, name) => Person(id.toInt, name)
- }
- })
- val apartments: RDD[Apartment] = rawApartmentRdd.map(strPerson => {
- strPerson.split(",").map(_.trim) match {
- case Array(id_apartment, id_human, num_rooms, address) => Apartment(id_apartment.toInt, id_human.toInt, num_rooms.toInt, address)
- }
- })
- time(lab4(apartments, persons, printResults = false))
- time(lab5(apartments, persons, printResults = false))
- sc.stop()
- }
- private def getSparkContext(conf: SparkConf): SparkContext = {
- if (System.getProperty("os.name").toLowerCase.contains("windows")) {
- System.setProperty(
- "hadoop.home.dir",
- Paths.get(this.getClass.getResource("/winutils/hadoop-2.7.1/").toURI).toString
- )
- }
- new SparkContext(conf)
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement