Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import re
- import requests
- import json
- import sqlite3
- from pylog import PyLog
- from datetime import datetime
- from sys import argv
- database = 'abuseip.db'
- conn = sqlite3.connect(database)
- myIp = "0.0.0.0"
- base_url = 'https://api.abuseipdb.com/api/v2/'
- api_key = (
- 'YOUR_API_KEY_HERE'
- )
- log = PyLog('abuseip.log')
- whitelist = ['0.0.0.0'] # White list of IP addresses
- force_report = False
- def close():
- conn.close()
- def write_to_db(ip, confidence, categories, timestamp):
- c = conn.cursor()
- c.execute(
- 'CREATE TABLE IF NOT EXISTS ip_reports ('
- 'ip TEXT, confidence INTEGER, categories TEXT, timestamp TEXT)'
- )
- # print(f"Writing to db: {ip}, {confidence}, {categories}, {timestamp}")
- c.execute(
- 'INSERT INTO ip_reports VALUES (?, ?, ?, ?)',
- (ip, confidence, categories, timestamp)
- )
- conn.commit()
- def read_from_db(ip):
- c = conn.cursor()
- c.execute('SELECT * FROM ip_reports WHERE ip = ?', (ip,))
- return c.fetchall()
- def check(ip):
- if (ip in whitelist):
- log.warning(f'IP address: {ip} is whitelisted.')
- return (False, None, 0)
- url = base_url + 'check'
- querystring = {
- 'ipAddress': ip,
- 'maxAgeInDays': '365'
- }
- headers = {
- 'Accept': 'application/json',
- 'Key': api_key
- }
- response = requests.get(url, headers=headers, params=querystring)
- decoded_response = json.loads(response.text)
- ipAddress = decoded_response['data']['ipAddress']
- totalReports = decoded_response['data']['totalReports']
- confidence = decoded_response['data']['abuseConfidenceScore']
- if totalReports > 0 and confidence > 0:
- log.warning(
- f'The IP address: {ipAddress} has previous reports on '
- f'file, and a confidence score of {confidence}.'
- )
- return (
- True,
- decoded_response['data']['abuseConfidenceScore'],
- totalReports
- )
- elif totalReports > 0 and confidence == 0:
- log.normal(
- f'The IP address: {ipAddress} is suspicious as it has been '
- f'reported, but appears legitimate, so sayeth AbuseIPDB.'
- )
- else:
- log.info(f'No abuse reports found for IP address: {ip}')
- return (False, None, 0)
- def ban(ip, categories, timestamp, comment):
- url = base_url + 'report'
- params = {
- 'ip': ip,
- 'categories': categories,
- 'timestamp': timestamp,
- 'comment': comment
- }
- headers = {
- 'Accept': 'application/json',
- 'Key': api_key
- }
- response = requests.post(url, headers=headers, params=params)
- decoded_response = json.loads(response.text)
- if 'errors' in decoded_response.keys():
- error = decoded_response['errors'][0]['detail']
- log.error(f'Error: {error}')
- return
- ipAddress = decoded_response['data']['ipAddress']
- confidence = decoded_response['data']['abuseConfidenceScore']
- log.success(
- f'You\'ve successfully reported the IP address: {ipAddress}! '
- f'Confidence score: {confidence}'
- )
- def parse_nginx_log(file_path):
- with open(file_path, 'r') as file:
- for line in file:
- match = re.match(r'(\d+\.\d+\.\d+\.\d+) - - \[(.*?)\] "(.*?)"',
- line)
- if match:
- ip = match.group(1)
- if ip == myIp: # Skip this IP address
- continue
- if 'ip_messages' not in locals():
- ip_messages = {}
- timestamp = match.group(2)
- # Convert timestamp to Atom format
- timestamp_format = (
- "%d/%b/%Y:%H:%M:%S %z"
- )
- timestamp_obj = datetime.strptime(timestamp,
- timestamp_format)
- atom_timestamp = timestamp_obj.isoformat()
- message = match.group(3)
- try:
- lst = read_from_db(ip)
- if (not force_report):
- if (len(lst) > 0 and lst[0][3] == timestamp):
- log.warning(
- f'IP: {ip} has been reported before.'
- )
- else:
- log.warning(
- f'IP: {ip} has been reported before, '
- f'but not at this time.'
- )
- continue
- except Exception as e:
- log.error(f'Error: {e}')
- if ip in ip_messages:
- ip_messages[ip] += f" | {message}"
- else:
- ip_messages[ip] = message
- for ip, message in ip_messages.items():
- # print(f"IP: {ip}, Message: {message}")
- message = (message[:1020] + '...') if len(
- message) > 1024 else message
- c = check(ip)
- if c[0]:
- cats = '18'
- if (message.find('Custom-AsyncHttpClient') != -1):
- cats += ',15'
- if (message.find('GET /robots.txt') != -1):
- cats += ',19'
- if (message.find('GET /wp-admin') != -1 or
- message.find('GET /wp-login.php') != -1 or
- message.find('GET /wp-content') != -1):
- cats += ',21'
- ban(ip, cats, atom_timestamp, message)
- write_to_db(ip, c[1], cats, atom_timestamp)
- return len(ip_messages)
- if __name__ == "__main__":
- if len(argv) < 2:
- log.usage("Usage: python abuseip.py <nginx_log_file> [force_report]")
- exit(1)
- log_file_path = argv[1]
- if (len(argv) == 3 and argv[2] == "force_report"):
- log.info("Force report mode enabled.")
- force_report = True
- if not os.path.exists(log_file_path):
- log.error(f"Error: File {log_file_path} not found.")
- exit(1)
- count = parse_nginx_log(log_file_path)
- log.info(f"Total unique IP addresses: {count}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement