Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from playwright.async_api import async_playwright
- import asyncio
- import json
- from datetime import datetime
- import re
- import random
- import time
- import logging
- # Setting up logging for better tracking
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger()
- class TwitterScraper:
- def __init__(self, max_retries=5, base_delay=1, max_delay=300):
- self.base_delay = base_delay # Base delay in seconds
- self.max_delay = max_delay # Max delay (5 minutes)
- self.current_delay = self.base_delay
- self.max_retries = max_retries # Max retries for a failed operation
- self.consecutive_errors = 0
- self.seen_tweets = set()
- self.tweets = []
- async def handle_rate_limit(self, page):
- """Handle rate limiting with exponential backoff"""
- self.consecutive_errors += 1
- self.current_delay = min(self.base_delay * (2 ** self.consecutive_errors) +
- random.uniform(0, 1), self.max_delay)
- logger.warning(f"Rate limit detected. Waiting for {self.current_delay:.1f} seconds...")
- await asyncio.sleep(self.current_delay)
- # Try to find and click retry button
- try:
- retry_button = await page.query_selector('div[role="button"]:has-text("Retry")')
- if retry_button:
- await retry_button.click()
- else:
- await page.reload()
- except Exception as e:
- logger.error(f"Error during retry: {e}")
- await page.reload()
- await page.wait_for_load_state('networkidle')
- return True
- async def check_for_rate_limit(self, page):
- """Check if we've hit a rate limit or error state"""
- try:
- error_message = await page.query_selector('div:has-text("Something went wrong")')
- if error_message:
- return await self.handle_rate_limit(page)
- self.consecutive_errors = max(0, self.consecutive_errors - 1) # Decrease error count on success
- return False
- except Exception as e:
- logger.error(f"Error checking for rate limit: {e}")
- return False
- async def process_tweet(self, tweet):
- """Process each tweet, extract text and timestamp"""
- try:
- text_element = await tweet.query_selector('div[data-testid="tweetText"]')
- if not text_element:
- return None
- text = await text_element.inner_text()
- timestamp_element = await tweet.query_selector('time')
- timestamp = await timestamp_element.get_attribute('datetime') if timestamp_element else None
- # Create a unique identifier for the tweet (text + timestamp)
- tweet_id = f"{text}_{timestamp}"
- if tweet_id not in self.seen_tweets:
- self.seen_tweets.add(tweet_id)
- return {
- 'text': text,
- 'role': 'assistant',
- 'content': text,
- 'timestamp': timestamp,
- 'id': tweet_id
- }
- return None
- except Exception as e:
- logger.error(f"Error processing tweet: {e}")
- return None
- async def get_tweets_and_replies(self, username, tweet_count=1000):
- """Scrape tweets and replies from a user's profile"""
- async with async_playwright() as p:
- browser = await p.firefox.launch(headless=False)
- page = await browser.new_page()
- # First navigate to Twitter and wait for manual login
- await page.goto('https://twitter.com/login')
- logger.info("Please log in to Twitter manually in the browser window.")
- logger.info("Once you're logged in, press Enter to start scraping...")
- await asyncio.get_event_loop().run_in_executor(None, input)
- # Navigate to user's tweets & replies
- logger.info(f"Navigating to {username}'s tweets & replies...")
- await page.goto(f"https://twitter.com/{username}/with_replies")
- await page.wait_for_selector('article')
- logger.info("Starting to collect tweets and replies...")
- last_tweet_count = 0
- no_new_tweets_count = 0
- scroll_pause_time = 1
- while len(self.tweets) < tweet_count:
- # Check for rate limiting
- if await self.check_for_rate_limit(page):
- continue
- # Get all tweet elements
- tweet_elements = await page.query_selector_all('article')
- # Process tweets in parallel
- tasks = [self.process_tweet(tweet) for tweet in tweet_elements]
- processed_tweets = await asyncio.gather(*tasks)
- # Add new unique tweets
- new_tweets = [t for t in processed_tweets if t is not None]
- self.tweets.extend(new_tweets)
- logger.info(f"Collected {len(self.tweets)} unique tweets/replies", end='\r')
- # Check if we're still getting new tweets
- if len(self.tweets) == last_tweet_count:
- no_new_tweets_count += 1
- if no_new_tweets_count > 5:
- logger.info("\nNo new tweets found after multiple scrolls. Stopping...")
- break
- else:
- no_new_tweets_count = 0
- last_tweet_count = len(self.tweets)
- # Scroll with variable pause time
- await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
- await asyncio.sleep(scroll_pause_time)
- # Gradually increase scroll pause time if we're getting rate limited
- if self.consecutive_errors > 0:
- scroll_pause_time = min(scroll_pause_time * 1.2, 5)
- else:
- scroll_pause_time = max(scroll_pause_time * 0.9, 1)
- await browser.close()
- return self.tweets[:tweet_count]
- def clean_tweets(tweets):
- """Clean and format tweets for further processing"""
- cleaned_tweets = []
- for tweet in tweets:
- # Remove URLs, @mentions, and special characters
- text = re.sub(r'https?://\S+|@\w+', '', tweet['text'])
- text = re.sub(r'\s+', ' ', text).strip()
- if text: # Only add non-empty tweets
- cleaned_tweets.append({
- 'role': 'assistant',
- 'content': text,
- 'timestamp': tweet['timestamp']
- })
- return cleaned_tweets
- async def main():
- username = input("Enter the Twitter username to scrape (without @): ").strip()
- tweet_count = int(input("Enter number of tweets to collect (default 1000): ") or 1000)
- logger.info("\nStarting Twitter scraper...")
- scraper = TwitterScraper()
- tweets = await scraper.get_tweets_and_replies(username, tweet_count)
- # Clean and format tweets
- cleaned_tweets = clean_tweets(tweets)
- # Save to file
- filename = f"{username}_tweets_and_replies_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
- with open(filename, 'w', encoding='utf-8') as f:
- json.dump(cleaned_tweets, f, ensure_ascii=False, indent=2)
- logger.info(f"\nCollected and saved {len(cleaned_tweets)} unique tweets/replies to {filename}")
- if __name__ == "__main__":
- asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement