Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import os
- import magic
- import json
- import sys
- import socket
- import datetime
- import threading
- from email.utils import formatdate
- from pprint import pprint as print
- class Config:
- def __init__(self):
- self.log_dir = None
- self.vhosts = []
- self.address_dict = {}
- def set_log_dir(self, log_dir):
- self.log_dir = log_dir
- if not os.path.exists(self.log_dir):
- os.mkdir(self.log_dir)
- def add_vhost(self, vhost):
- self.vhosts.append(vhost)
- address = (vhost["ip"], vhost["port"])
- if address not in self.address_dict:
- self.address_dict[address] = []
- self.address_dict[address] += [vhost]
- def get_unique_ip_ports(self):
- return self.address_dict.keys()
- def get_address_via_host(self, host):
- for address in self.address_dict:
- vhost_arr = self.address_dict[address]
- for vhost in vhost_arr:
- if host == vhost["vhost"]:
- return address
- return None
- class Worker(threading.Thread):
- def __init__(self, client_socket, client_address, server_address, config):
- threading.Thread.__init__(self)
- self.socket = client_socket
- self.address = client_address
- self.server_address = server_address
- self.config = config
- def run(self):
- self.request = self.socket.recv(1024)
- self.process_request()
- self.generate_response()
- self.socket.send(self.response)
- self.socket.close()
- self.log_request()
- def process_request(self):
- decoded_request = self.request.decode().strip()
- request_line, headers = decoded_request.split("\r\n", 1)
- # process request line
- self._method, self._path, self._http_version = request_line.split(" ")
- self._path = self._path.replace("%20", " ")
- # process headers
- self._headers = headers.split("\r\n")
- self._headers = [header.split(":", 1) for header in self._headers]
- self._headers = {kv[0].strip().lower(): kv[1].strip() for kv in self._headers}
- def generate_response(self):
- self._generate_response_headers()
- self._generate_response_body()
- self.response_headers["content-length"] = len(self.response_body)
- response_headers_str = f"{self._http_version} {self._status_code}\r\n"
- for key, value in self.response_headers.items():
- response_headers_str += f"{key}:{value}\r\n"
- response_headers_str += "\r\n"
- if self._method == "HEAD":
- self.response = response_headers_str.encode()
- elif self._method == "GET":
- self.response = response_headers_str.encode() + self.response_body
- def log_request(self):
- date = datetime.datetime.now().strftime("%a %b %d %H:%M:%S %Y")
- ip = self.address[0]
- host = self._headers["host"].split(":")[0]
- status_code = self._status_code.split(" ")[0]
- path = self._path
- content_length = self.response_headers["content-length"]
- user_agent = self._headers["user-agent"]
- file_name = self.config.log_dir + "/" + host + ".log"
- if status_code == '404':
- file_name = self.config.log_dir + "/error.log"
- with open(file_name, "a+") as file:
- file.write(
- f"[{date}] {ip} {host} {path} {status_code} {content_length} {user_agent}\n"
- )
- def _generate_response_headers(self):
- self.response_headers = {
- "server": "lashiko-v1.0",
- "date": formatdate(timeval=None, localtime=False, usegmt=True),
- "connection": "keep-alive",
- "content-type": "text/html",
- "Accept-Ranges": " bytes",
- "keep-alive": "5",
- "etag": "A2E2EFC1",
- }
- def _generate_response_body(self):
- path = self._generate_requested_path()
- if not path or not os.path.exists(path):
- self.response_body = self._generate_response_body_404()
- elif os.path.isfile(path):
- self.response_body = self._generate_response_body_200_file(path)
- else:
- self.response_body = self._generate_response_body_200_dir(path)
- def _generate_response_body_404(self):
- self._status_code = "404 NOT FOUND"
- return b"<html><body><h1>REQUESTED DOMAIN NOT FOUND</h1></body></html>"
- def _generate_response_body_200_file(self, path):
- self._status_code = "200 OK"
- self.response_headers["content-type"] = magic.Magic(mime=True).from_file(path)
- with open(path, "rb") as file:
- if "range" in self._headers:
- offset, length = self._get_offset_and_length_from_header(
- self._headers["range"]
- )
- file.seek(offset)
- if length:
- return file.read(length)
- return file.read()
- def _generate_response_body_200_dir(self, path):
- self._status_code = "200 OK"
- splitted = os.path.split(path)[1:]
- base_url = "/".join(splitted)
- url_lists = "<br>".join(
- [
- f'<a href={base_url}/{item.replace(" ", "%20")}>{base_url}/{item}</a>'
- for item in os.listdir(path)
- ]
- )
- return f"<html><body><ul>{url_lists}</ul></body></html>".encode()
- def _generate_requested_path(self):
- if ":" not in self._headers["host"]:
- return None
- host, port = self._headers["host"].split(":")
- address = self.config.get_address_via_host(host)
- if address != self.server_address:
- return None
- return (host + "/" + self._path).replace("//", "/")
- def _get_offset_and_length_from_header(self, range_str):
- bytes_str = range_str[6:]
- offset, length = bytes_str.split("-")
- if length:
- return int(offset), int(offset)
- return int(offset), None
- class Server(threading.Thread):
- def __init__(self, ip, port, config):
- threading.Thread.__init__(self)
- self.ip = ip
- self.port = port
- self.config = config
- def _start_socket_connection(self):
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self.sock.bind((self.ip, self.port))
- self.sock.listen(1024)
- def run(self):
- self._start_socket_connection()
- while True:
- conn, addr = self.sock.accept()
- Worker(conn, addr, (self.ip, self.port), self.config).start()
- def start_virtual_host_servers(config):
- adresses = config.get_unique_ip_ports()
- for ip, port in adresses:
- Server(ip, port, config).start()
- def read_config_file():
- if len(sys.argv) < 2:
- print("Please pass config file. Example: python main.py config.json")
- exit()
- config = Config()
- with open(sys.argv[1], "r") as config_file:
- content = json.loads(config_file.read())
- config.set_log_dir(content["log"])
- for vhost in content["server"]:
- config.add_vhost(vhost)
- return config
- if __name__ == "__main__":
- # 1. Read config file
- config = read_config_file()
- # 2. Start Server
- start_virtual_host_servers(config)
Add Comment
Please, Sign In to add comment