Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from bson.json_util import dumps
- from bs4 import BeautifulSoup as bs4
- from contextlib import contextmanager
- from datetime import datetime
- import random
- import requests
- import signal
- import pymongo
- from tldextract import extract
- p = print
- client = pymongo.MongoClient(
- f"mongodb://"
- f"kopyl:oleg66@"
- f"localhost" # REMOTE or LOCAL
- )
- db_clutch = client["clutchwebsites"]["websites"]
- proxies = [
- {"https": "209.127.191.180:9279"},
- {"https": "45.95.96.132:8691"},
- {"https": "45.95.96.187:8746"},
- {"https": "45.95.96.237:8796"},
- {"https": "45.136.228.154:6209"},
- {"https": "45.94.47.66:8110"},
- {"https": "45.94.47.108:8152"},
- {"https": "193.8.56.119:9183"},
- {"https": "45.95.99.226:7786"},
- {"https": "45.95.99.20:7580"}
- ]
- class CustomTimeoutError(Exception):
- pass
- def raise_timeout(signum, frame):
- raise CustomTimeoutError
- @contextmanager
- def timeout(time):
- # https://www.jujens.eu/posts/en/2018/Jun/02/python-timeout-function/
- signal.signal(signal.SIGALRM, raise_timeout)
- signal.alarm(time)
- try:
- yield
- finally:
- signal.signal(signal.SIGALRM, signal.SIG_IGN)
- def get_agencies_from_page(page_number):
- proxy = random.choice(proxies)
- p("PROXY:", proxy)
- page = requests.get(
- "https://clutch.co/us/web-designers",
- params={"page": page_number},
- proxies=proxy
- )
- page = bs4(page.text, "lxml")
- agencies = page.select("li.provider")
- new_agencies = []
- for agency in agencies:
- clutch_links = agency.select('a[href^="/profile/"]')
- for link in clutch_links:
- name = link.text.strip()
- if not name: continue
- if name == "More": continue
- if name == "View Profile": continue
- clutch_link = link["href"]
- if "#reviews" in link["href"]: continue
- if "#showmore" in link["href"]: continue
- if not agency.select("a.website-link__item"): continue
- website_link = agency.select("a.website-link__item")
- website_link = website_link[0]["href"]
- if "?utm_source" in website_link:
- website_link = website_link.split("?utm_source")
- website_link = website_link[0]
- elif "?utm_campaign" in website_link:
- website_link = website_link.split("?utm_campaign")
- website_link = website_link[0]
- agency_main = {
- "name": name,
- "clutch_link": clutch_link,
- "website_link": website_link
- }
- new_agencies.append(agency_main)
- return new_agencies
- def save_all_usa_clutch_pages(start_page=0):
- for x in range(463):
- if x < start_page:
- continue
- p(f"Getting {x} page")
- pages = get_agencies_from_page(x)
- assert len(pages) <= 50
- db_clutch.insert_many(pages)
- p("Page saved")
- def parse_phone_and_location(url):
- proxy = random.choice(proxies)
- p("proxy:", proxy)
- page = requests.get(url, proxies=proxy)
- page = bs4(page.text, "lxml")
- contacts = page.select_one("li.quick-menu-details")
- location = contacts.select_one("span").text
- phone = contacts.select_one("a").text.strip()
- phone_and_location = {
- "phone": phone,
- "location": location
- }
- return phone_and_location
- def update_phone_and_location():
- agency = db_clutch.find_one({"location": {"$exists": False}})
- if not agency:
- p("Not agency...")
- return
- name = agency["name"]
- p("Getting agency's details:", name)
- clutch_link = f"https://clutch.co{agency['clutch_link']}"
- phone_and_location = parse_phone_and_location(clutch_link)
- db_clutch.update_one(
- {"_id": agency["_id"]},
- {"$set": phone_and_location}
- )
- return agency
- def incorrect_domain(email):
- domain = email.split("@")
- if len(domain) != 2:
- return True
- domain = domain[1]
- name_and_zone = domain.split(".")
- if len(name_and_zone) < 2:
- return True
- name = name_and_zone[-2]
- zone = name_and_zone[-1]
- for char in name:
- if not char.isalnum() and char not in [".", "-", "_", "+"]:
- return True
- if not zone.isalnum():
- return True
- def is_email(email):
- if not all(
- char in email for char in ["@","."]
- ):
- return False
- if string_has_repetitive_chars(["@", "."], email):
- return False
- if email.startswith("."):
- return False
- if email.endswith("."):
- return False
- if "/" in email:
- return False
- for char in email:
- if not char.isalnum() and char not in ["@", ".", "-", "_", "+"]:
- return False
- if incorrect_domain(email):
- return False
- return True
- def string_has_repetitive_chars(chars: list, string: str) -> int:
- for n, char in enumerate(string):
- if n == 0: continue
- if string[n] == string[n-1] in chars:
- return True
- return False
- def parse_emails(website_link: str) -> list:
- for x in range(5):
- try:
- with timeout(11):
- request = requests.get(website_link, timeout=10)
- break
- except (requests.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
- p("timeout")
- return []
- except requests.exceptions.InvalidURL:
- p("INVALID URL")
- return []
- except requests.exceptions.TooManyRedirects:
- p("Too many redirects")
- return []
- except CustomTimeoutError:
- p("CustomTimeoutError")
- return []
- except requests.exceptions.InvalidSchema:
- p("InvalidSchema:")
- return []
- except requests.exceptions.ConnectionError:
- p("Retry")
- continue
- except requests.exceptions.ChunkedEncodingError:
- p("Retry 2")
- continue
- else:
- return []
- page = bs4(request.text, "lxml")
- emails = []
- for element in page.find_all():
- for attribute in element.attrs:
- if (
- type(element[attribute]) == str and
- "@" in element[attribute]
- ):
- emails.extend(element[attribute].split())
- elif (
- type(element[attribute]) == list and
- "@" in str(element[attribute])
- ):
- emails.extend(element[attribute])
- if "@" in element.text:
- emails.extend(element.text.split())
- return emails
- def clean_emails(emails: list) -> list:
- emails = [email.strip() for email in emails]
- emails = [
- email for email in emails
- if is_email(email)
- ]
- emails = list(dict.fromkeys(emails))
- return emails
- def exctract_emails(website_link: str):
- emails = parse_emails(website_link)
- emails = clean_emails(emails)
- return emails
- def get_website_root(website_link):
- website_link = website_link.split("/")
- website_root = website_link[:3]
- website_root = "/".join(website_root)
- return website_root
- def get_domain(website_root):
- tsd, td, tsu = extract(website_root)
- if td and tsu:
- url = td + '.' + tsu
- else:
- url = None
- return url
- def get_links(website_link):
- for x in range(5):
- try:
- request = requests.get(website_link, timeout=5)
- break
- except (requests.ConnectTimeout, requests.exceptions.ReadTimeout) as e:
- p("timeout")
- return []
- except requests.exceptions.InvalidURL:
- p("INVALID URL")
- return []
- except requests.exceptions.TooManyRedirects:
- p("Too many redirects")
- return []
- except requests.exceptions.ConnectionError:
- p("Retry")
- continue
- except requests.exceptions.ChunkedEncodingError:
- p("Retry 2")
- continue
- else:
- return []
- page = bs4(request.text, "lxml")
- links = page.select("a")
- # p(links)
- links = [link.get("href") for link in links]
- links = [link for link in links if link]
- links = [
- link.replace(website_link, "")
- for link in links
- ]
- links = [
- link for link in links if
- not any([x in link for x in ["tel:", "javascript:", "skype:"]])
- ]
- links = [
- link.strip() for link in links if
- not any([link == x for x in [" ", "#"]])
- ]
- new_links = []
- for link in links:
- if get_domain(link):
- if get_domain(link) != get_domain(website_link):
- continue
- link = link.split(get_domain(link))
- if not link:
- continue
- if len(link) == 2:
- link = link[1]
- else:
- link = link[0]
- link = link.split("/")
- link = [l for l in link if l]
- if not link:
- continue
- link = link[0]
- if link.startswith("#"):
- continue
- if link not in new_links:
- new_links.append(link)
- return new_links
- pages_to_visit = [
- "",
- "contact",
- "about",
- "contact-us",
- "about-us",
- "careers",
- "contact.html",
- "contacts",
- "contact.php",
- "support",
- "team"
- ]
- def get_emails_amount_may_be_saved():
- emails_saved_amount = (
- db_clutch.count_documents({ "emails": {"$gt": {"$size": 0} } })
- ) + 1
- websites_scraped_amount = (
- db_clutch.count_documents({ "emails": {"$exists": True } })
- ) + 1
- emails_per_website = emails_saved_amount / websites_scraped_amount
- emails_may_be_saved = int(emails_per_website * 23146)
- return emails_may_be_saved, emails_saved_amount, websites_scraped_amount
- links_to_avoid = [
- "https://uniqueamb.com/team",
- "https://www.sleeplessmedia.com/about"
- ]
- def scrape_emails():
- for n, agency in enumerate(list(db_clutch.find({"emails": {"$exists": False}})), 1):
- emails_amount_may_be_saved, emails_saved, websites_scraped_amount = (
- get_emails_amount_may_be_saved()
- )
- _id = agency["_id"]
- website_root = agency["website_root"]
- name = agency["name"]
- website_links = [
- (
- f"{website_root}/{page}".strip(),
- page,
- pages_to_visit.index(page) + 1
- )
- for page in pages_to_visit
- ]
- emails = None
- for website_link, page, ln in website_links:
- p(f"{websites_scraped_amount}.{ln}:", "Downloading", website_link)
- if website_link in links_to_avoid:
- continue
- emails = exctract_emails(website_link)
- if emails:
- source = f"/{page}" if page else "/"
- p(
- "Email/s found",
- "Total Emails:", emails_saved,
- "possibly_emails_will_be_saved",
- emails_amount_may_be_saved
- )
- emails = {
- "source": source,
- "emails": emails,
- "saved_on": datetime.now()
- }
- break
- db_clutch.update_one(
- {"_id": _id},
- {
- "$set": {"emails": emails}
- }
- )
- scrape_emails()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement