Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- from asyncio import Queue
- from os import getenv
- from loguru import logger
- from lxml.etree import HTML
- from more_itertools import first
- from pyppeteer import launch
- from pyppeteer.page import Page
- from pyppeteer_stealth import stealth
- product_url = 'https://detail.tmall.com/item.htm?' \
- 'spm=a1z10.5-b-s.w4011-21229599754.159.a43d3ab8lMMAyo&' \
- 'id=636149087216&rn=6d43f66ab34e0ad782135f76e059ddc7&abbucket=1'
- url_all_prd = 'https://skecherstx.tmall.com/?search=y'
- exit_flag = False
- async def parse_page(page: Page):
- # 关闭登录框
- try:
- elem = await page.waitForXPath('//div[@class="baxia-dialog-close"]')
- await elem.click()
- except TimeoutError:
- pass
- # 点击评论
- review = await page.waitForXPath('//a[text()="累计评价 "]/parent::li')
- # magic sleep
- await asyncio.sleep(1)
- await review.click()
- while True:
- await page.waitForXPath('//div[@class="rate-grid"]//tr')
- # 拿整个网页的html
- html = HTML(await page.content())
- rows = html.xpath('//div[@class="rate-grid"]//tr')
- for row in rows:
- d = {
- 'rate_content': first(row.xpath('.//div[@class="tm-rate-content"]/div[@class="tm-rate-fulltxt"]//text()'), None),
- 'rate_date': first(row.xpath('.//div[@class="tm-rate-date"]/text()'), None),
- 'rate_reply': first(row.xpath('.//div[@class="tm-rate-reply"]//text()'), None),
- 'rate_sku': ';'.join(row.xpath('.//div[@class="rate-sku"]/p/text()')),
- 'rate_user': ''.join(row.xpath('.//div[@class="rate-user-info"]//text()')),
- }
- logger.info(d)
- next_page = await page.waitForXPath('//div[@class="rate-paginator"]/span[not(@class)]/following-sibling::a')
- if not next_page:
- break
- await next_page.click()
- pass
- async def init_page(page: Page):
- await stealth(page)
- await page.setViewport({
- 'width': 1200,
- 'height': 960
- })
- return page
- @logger.catch
- async def worker(browser, queue):
- page = await browser.newPage()
- page = await init_page(page)
- logger.info(f'worker initialized')
- while not exit_flag or not queue.empty():
- url = await queue.get()
- logger.debug(f'going to url: {url}')
- await page.goto(url)
- await parse_page(page)
- @logger.catch
- async def master(browser, queue):
- global exit_flag
- page = await browser.newPage()
- page = await init_page(page)
- # 打开所有产品页面
- await page.goto(url_all_prd)
- while True:
- await page.waitForXPath('//div[contains(@class, "item") and contains(@class, "line")]')
- # 提取产品链接
- html = HTML(await page.content())
- urls = html.xpath('//div[contains(@class, "item") and contains(@class, "line")]//dt/a/@href')
- for prd_url in urls:
- await queue.put(prd_url)
- # 翻页
- try:
- next_page = await page.waitForXPath('//div[@class="pagination"]/a[@class="page-cur"]/following-sibling::a')
- except TimeoutError:
- logger.info(f'Might be detected')
- break
- if not next_page:
- break
- await next_page.click()
- exit_flag = True
- async def main(headless: bool = True, n_workers: int = 1):
- logger.info('Starting')
- browser = await launch(
- headless=headless,
- args=[
- '--no-sandbox',
- ]
- )
- queue = Queue()
- # 开一个master,若干worker
- await asyncio.gather(
- master(browser, queue),
- *(worker(browser, queue) for _ in range(n_workers))
- )
- if __name__ == '__main__':
- n = getenv('N_WORKERS', 1)
- asyncio.run(main(False, n))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement