Advertisement
xosski

Twitter scraper

Jan 1st, 2025
12
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 7.69 KB | None | 0 0
  1. from playwright.async_api import async_playwright
  2. import asyncio
  3. import json
  4. from datetime import datetime
  5. import re
  6. import random
  7. import time
  8. import logging
  9.  
  10. # Setting up logging for better tracking
  11. logging.basicConfig(level=logging.INFO)
  12. logger = logging.getLogger()
  13.  
  14. class TwitterScraper:
  15. def __init__(self, max_retries=5, base_delay=1, max_delay=300):
  16. self.base_delay = base_delay # Base delay in seconds
  17. self.max_delay = max_delay # Max delay (5 minutes)
  18. self.current_delay = self.base_delay
  19. self.max_retries = max_retries # Max retries for a failed operation
  20. self.consecutive_errors = 0
  21. self.seen_tweets = set()
  22. self.tweets = []
  23.  
  24. async def handle_rate_limit(self, page):
  25. """Handle rate limiting with exponential backoff"""
  26. self.consecutive_errors += 1
  27. self.current_delay = min(self.base_delay * (2 ** self.consecutive_errors) +
  28. random.uniform(0, 1), self.max_delay)
  29.  
  30. logger.warning(f"Rate limit detected. Waiting for {self.current_delay:.1f} seconds...")
  31. await asyncio.sleep(self.current_delay)
  32.  
  33. # Try to find and click retry button
  34. try:
  35. retry_button = await page.query_selector('div[role="button"]:has-text("Retry")')
  36. if retry_button:
  37. await retry_button.click()
  38. else:
  39. await page.reload()
  40. except Exception as e:
  41. logger.error(f"Error during retry: {e}")
  42. await page.reload()
  43.  
  44. await page.wait_for_load_state('networkidle')
  45. return True
  46.  
  47. async def check_for_rate_limit(self, page):
  48. """Check if we've hit a rate limit or error state"""
  49. try:
  50. error_message = await page.query_selector('div:has-text("Something went wrong")')
  51. if error_message:
  52. return await self.handle_rate_limit(page)
  53. self.consecutive_errors = max(0, self.consecutive_errors - 1) # Decrease error count on success
  54. return False
  55. except Exception as e:
  56. logger.error(f"Error checking for rate limit: {e}")
  57. return False
  58.  
  59. async def process_tweet(self, tweet):
  60. """Process each tweet, extract text and timestamp"""
  61. try:
  62. text_element = await tweet.query_selector('div[data-testid="tweetText"]')
  63. if not text_element:
  64. return None
  65.  
  66. text = await text_element.inner_text()
  67. timestamp_element = await tweet.query_selector('time')
  68. timestamp = await timestamp_element.get_attribute('datetime') if timestamp_element else None
  69.  
  70. # Create a unique identifier for the tweet (text + timestamp)
  71. tweet_id = f"{text}_{timestamp}"
  72.  
  73. if tweet_id not in self.seen_tweets:
  74. self.seen_tweets.add(tweet_id)
  75. return {
  76. 'text': text,
  77. 'role': 'assistant',
  78. 'content': text,
  79. 'timestamp': timestamp,
  80. 'id': tweet_id
  81. }
  82. return None
  83. except Exception as e:
  84. logger.error(f"Error processing tweet: {e}")
  85. return None
  86.  
  87. async def get_tweets_and_replies(self, username, tweet_count=1000):
  88. """Scrape tweets and replies from a user's profile"""
  89. async with async_playwright() as p:
  90. browser = await p.firefox.launch(headless=False)
  91. page = await browser.new_page()
  92.  
  93. # First navigate to Twitter and wait for manual login
  94. await page.goto('https://twitter.com/login')
  95. logger.info("Please log in to Twitter manually in the browser window.")
  96. logger.info("Once you're logged in, press Enter to start scraping...")
  97. await asyncio.get_event_loop().run_in_executor(None, input)
  98.  
  99. # Navigate to user's tweets & replies
  100. logger.info(f"Navigating to {username}'s tweets & replies...")
  101. await page.goto(f"https://twitter.com/{username}/with_replies")
  102. await page.wait_for_selector('article')
  103.  
  104. logger.info("Starting to collect tweets and replies...")
  105. last_tweet_count = 0
  106. no_new_tweets_count = 0
  107. scroll_pause_time = 1
  108.  
  109. while len(self.tweets) < tweet_count:
  110. # Check for rate limiting
  111. if await self.check_for_rate_limit(page):
  112. continue
  113.  
  114. # Get all tweet elements
  115. tweet_elements = await page.query_selector_all('article')
  116.  
  117. # Process tweets in parallel
  118. tasks = [self.process_tweet(tweet) for tweet in tweet_elements]
  119. processed_tweets = await asyncio.gather(*tasks)
  120.  
  121. # Add new unique tweets
  122. new_tweets = [t for t in processed_tweets if t is not None]
  123. self.tweets.extend(new_tweets)
  124. logger.info(f"Collected {len(self.tweets)} unique tweets/replies", end='\r')
  125.  
  126. # Check if we're still getting new tweets
  127. if len(self.tweets) == last_tweet_count:
  128. no_new_tweets_count += 1
  129. if no_new_tweets_count > 5:
  130. logger.info("\nNo new tweets found after multiple scrolls. Stopping...")
  131. break
  132. else:
  133. no_new_tweets_count = 0
  134. last_tweet_count = len(self.tweets)
  135.  
  136. # Scroll with variable pause time
  137. await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
  138. await asyncio.sleep(scroll_pause_time)
  139.  
  140. # Gradually increase scroll pause time if we're getting rate limited
  141. if self.consecutive_errors > 0:
  142. scroll_pause_time = min(scroll_pause_time * 1.2, 5)
  143. else:
  144. scroll_pause_time = max(scroll_pause_time * 0.9, 1)
  145.  
  146. await browser.close()
  147.  
  148. return self.tweets[:tweet_count]
  149.  
  150. def clean_tweets(tweets):
  151. """Clean and format tweets for further processing"""
  152. cleaned_tweets = []
  153. for tweet in tweets:
  154. # Remove URLs, @mentions, and special characters
  155. text = re.sub(r'https?://\S+|@\w+', '', tweet['text'])
  156. text = re.sub(r'\s+', ' ', text).strip()
  157.  
  158. if text: # Only add non-empty tweets
  159. cleaned_tweets.append({
  160. 'role': 'assistant',
  161. 'content': text,
  162. 'timestamp': tweet['timestamp']
  163. })
  164. return cleaned_tweets
  165.  
  166. async def main():
  167. username = input("Enter the Twitter username to scrape (without @): ").strip()
  168. tweet_count = int(input("Enter number of tweets to collect (default 1000): ") or 1000)
  169.  
  170. logger.info("\nStarting Twitter scraper...")
  171. scraper = TwitterScraper()
  172. tweets = await scraper.get_tweets_and_replies(username, tweet_count)
  173.  
  174. # Clean and format tweets
  175. cleaned_tweets = clean_tweets(tweets)
  176.  
  177. # Save to file
  178. filename = f"{username}_tweets_and_replies_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
  179. with open(filename, 'w', encoding='utf-8') as f:
  180. json.dump(cleaned_tweets, f, ensure_ascii=False, indent=2)
  181.  
  182. logger.info(f"\nCollected and saved {len(cleaned_tweets)} unique tweets/replies to {filename}")
  183.  
  184. if __name__ == "__main__":
  185. asyncio.run(main())
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement