Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import shelve
- from os import path
- from pathlib import Path
- from aiofile import async_open
- from httpx import AsyncClient
- from loguru import logger
- from lxml.etree import HTML
- from more_itertools import first
- url_tmpl = 'http://www.netbian.com/index{}.htm'
- downloads_dir = Path('downloads')
- trans = str.maketrans('', '', r'\/:*?"<>|')
- logger.add('netbian.log', rotation='1 weeks')
- headers = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
- 'AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/89.0.4389.128 Safari/537.36'
- }
- cache = shelve.open('cache', writeback=True)
- async def download_img(client: AsyncClient, url):
- r = await client.get(url)
- r.encoding = 'gbk'
- html = HTML(r.text)
- img_url = first(html.xpath('//div[@class="pic"]//img/@src'), None)
- title = first(html.xpath('//div[@class="pic"]//img/@title'), None)
- if not img_url:
- return
- *_, ext = path.splitext(img_url)
- r = await client.get(img_url)
- if r.status_code != 200:
- logger.warning(f'{img_url=} got wrong status code: {r.status_code}')
- return
- async with async_open(downloads_dir / f'{title.translate(trans)}{ext}', 'wb') as f:
- async for t in r.aiter_bytes():
- await f.write(t)
- async def main(start_over=False):
- logger.info('Staring')
- cur_page = cache.get('cur_page', 1) if not start_over else 1
- cur_url = url_tmpl.format(f'_{cur_page}' if cur_page != 1 else '')
- async with AsyncClient(headers=headers, timeout=30) as client:
- while cur_url:
- r = await client.get(cur_url)
- html = HTML(r.text)
- tasks = (
- asyncio.create_task(download_img(client, r.url.join(rel_url)))
- for rel_url in html.xpath('//div[@class="list"]//li/a[img]/@href')
- )
- await asyncio.gather(*tasks)
- cur_url = (tmp := first(html.xpath('//a[@class="prev"][last()]/@href'), None)) and r.url.join(tmp)
- logger.info(f'page: {cur_page} finish download')
- cur_page += 1
- cache['cur_page'] = cur_page
- logger.info('Finished downloading')
- if __name__ == '__main__':
- downloads_dir.mkdir(exist_ok=True)
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement