Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pyspark import SparkContext
- from math import isqrt, log10
- # Generate a sieve up to the square root of the limit. This is done in a sequential manner
- def generate_sieve_upto_sqrt(limit):
- sieve_to_return = []
- sieve_bound = isqrt(limit) + 1
- sieve = [True] * sieve_bound
- sieve[0], sieve[1] = False, False # 0 and 1 are not primes
- for num in range(2, isqrt(sieve_bound) + 1):
- if sieve[num]:
- for multiple in range(num*num, sieve_bound, num):
- sieve[multiple] = False
- for num, is_prime in enumerate(sieve):
- if is_prime:
- sieve_to_return.append(num)
- return sieve_to_return
- # Use the small sieve to filter primes within a segment
- def apply_sieve_on_segment(segment, small_sieve):
- # Initialize the start and end of the segment
- segment_start, segment_end = segment
- if segment_start < 2:
- segment_start = 2
- segment_sieve = [True] * (segment_end - segment_start + 1)
- primes_in_segment = []
- for prime in small_sieve:
- first_multiple = max(prime*prime, ((segment_start + prime - 1) // prime) * prime)
- # Remove all the multiples of the current prime
- for multiple in range(first_multiple, segment_end + 1, prime):
- segment_sieve[multiple - segment_start] = False
- # Add all the prime numbers to the result list to return
- for num in range(segment_start, segment_end + 1):
- # Append the number to the list if the number corresponding to the current index is True
- if segment_sieve[num - segment_start]:
- primes_in_segment.append(num)
- return primes_in_segment
- if __name__ == "__main__":
- sc = SparkContext.getOrCreate()
- max_num = 10**7 # 10 Million
- """
- Generate a small sieve to be used at each node for efficient filtering.
- The advantage of creating such a sieve lies in the reduced number of comparisons.
- For a large number like 1 Trillion (10**12), we need to create a small sieve of numbers
- only upto 10**6, which will have only 78k prime numbers. Thus, the total number of comparisons
- our segment requires will be much less. Distribute this small sieve along with each segment and
- perform our comparisons.
- """
- small_sieve = generate_sieve_upto_sqrt(max_num)
- num_slices = int(2 ** log10(max_num))
- range_step = max_num // num_slices
- # Create RDD of ranges (segments) to distribute. A range is denoted by tuple of (start_num, end_num)
- ranges = [(i, min(i + range_step, max_num)) for i in range(2, max_num+1, range_step)]
- ranges_rdd = sc.parallelize(ranges, numSlices=num_slices)
- # Apply the sieve on each segment in parallel
- primes_rdd = ranges_rdd.flatMap(lambda segment: apply_sieve_on_segment(segment, small_sieve))
- # primes = primes_rdd.collect()
- # Save primes to text file
- primes_rdd.map(str).saveAsTextFile("primes")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement