Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import socket
- import subprocess
- import paramiko
- import threading
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import hashes
- from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
- from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
- class SecureConnectionManager:
- def __init__(self, host, port, password):
- self.host = host
- self.port = port
- self.password = password
- def establish_connection(self):
- server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server_socket.bind((self.host, self.port))
- server_socket.listen(1)
- client_socket, _ = server_socket.accept()
- shared_key = self.derive_shared_key(client_socket)
- return client_socket, shared_key
- def derive_shared_key(self, client_socket):
- salt = client_socket.recv(16)
- kdf = PBKDF2HMAC(
- algorithm=hashes.SHA256(),
- iterations=100000,
- salt=salt,
- length=32,
- backend=default_backend()
- )
- key = kdf.derive(self.password.encode())
- return key
- def encrypt(self, data, key):
- cipher = Cipher(algorithms.AES(key), modes.CFB(b'\0' * 16), backend=default_backend())
- encryptor = cipher.encryptor()
- encrypted_data = encryptor.update(data.encode()) + encryptor.finalize()
- return encrypted_data
- def decrypt(self, encrypted_data, key):
- cipher = Cipher(algorithms.AES(key), modes.CFB(b'\0' * 16), backend=default_backend())
- decryptor = cipher.decryptor()
- decrypted_data = decryptor.update(encrypted_data) + decryptor.finalize()
- return decrypted_data.decode()
- def send_command(self, client_socket, command, key):
- encrypted_command = self.encrypt(command, key)
- client_socket.send(encrypted_command)
- def receive_output(self, client_socket, key):
- encrypted_response = client_socket.recv(4096)
- response = self.decrypt(encrypted_response, key)
- return response
- def close_connection(self, client_socket):
- client_socket.close()
- class AdminTool:
- @staticmethod
- def execute_command(command):
- try:
- output = subprocess.check_output(command, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
- except subprocess.CalledProcessError as e:
- output = f"Error: {e.output}"
- return output
- @staticmethod
- def handle_advanced_logic(command):
- # Realistic example: Check if the command is a file search and execute it.
- if "search_files" in command:
- result = subprocess.check_output(["find", "/path/to/search", "-name", "*.txt"], universal_newlines=True)
- else:
- result = "Invalid advanced command."
- return result
- @staticmethod
- def harvest_device_data():
- # Realistic example: Fetch system information using a custom script.
- try:
- data = subprocess.check_output(["python", "system_info_script.py"], universal_newlines=True)
- except subprocess.CalledProcessError as e:
- data = f"Error harvesting device data: {e.output}"
- return data
- @staticmethod
- def system_info():
- try:
- info = subprocess.check_output(['systeminfo'], shell=True, universal_newlines=True)
- except subprocess.CalledProcessError as e:
- info = f"Error retrieving system information: {e.output}"
- return info
- @staticmethod
- def run_custom_script(script_path):
- try:
- result = subprocess.check_output(['python', script_path], shell=True, universal_newlines=True)
- except subprocess.CalledProcessError as e:
- result = f"Error running custom script: {e.output}"
- return result
- if __name__ == '__main__':
- password = "your_super_secret_password"
- connection_manager = SecureConnectionManager("127.0.0.1", 8000, password)
- try:
- client_socket, shared_key = connection_manager.establish_connection()
- options = {
- "1": ("Run System Command", "echo 'Hello, World!'"),
- "2": ("Advanced Logic", "search_files"),
- "3": ("Harvest Device Data", "harvest_data"),
- "4": ("System Information", "system_info"),
- "5": ("Run Custom Script", "custom_script.py")
- }
- for key, (description, command) in options.items():
- print(f"{key}. {description}")
- choice = input("Select an option: ")
- if choice in options:
- admin_command = options[choice][1]
- if admin_command == "search_files":
- response = AdminTool.handle_advanced_logic(admin_command)
- elif admin_command == "harvest_data":
- response = AdminTool.harvest_device_data()
- elif admin_command == "system_info":
- response = AdminTool.system_info()
- elif admin_command == "custom_script.py":
- response = AdminTool.run_custom_script(admin_command)
- else:
- response = AdminTool.execute_command(admin_command)
- connection_manager.send_command(client_socket, response, shared_key)
- print(f"Response: {response}")
- connection_manager.close_connection(client_socket)
- except Exception as e:
- print(f"Error in main execution: {e}")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement