Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # tested with python3 only !!
- import os
- import time
- import pwd
- from datetime import datetime
- import threading
- def get_process_info(pid):
- try:
- with open(f"/proc/{pid}/status") as f:
- status_data = f.read()
- with open(f"/proc/{pid}/cmdline") as f:
- cmdline_data = f.read().replace("\0", " ").strip()
- status_lines = status_data.splitlines()
- process_info = {"pid": pid}
- for line in status_lines:
- if line.startswith("Name:"):
- process_info["name"] = line.split(":", 1)[1].strip()
- elif line.startswith("Uid:"):
- uid = int(line.split(":", 1)[1].strip().split()[0])
- process_info["username"] = pwd.getpwuid(uid).pw_name
- process_info["cmdline"] = cmdline_data
- return process_info
- except FileNotFoundError:
- return None
- def get_current_processes():
- process_list = []
- try:
- for pid in os.listdir("/proc"):
- if pid.isdigit():
- process_info = get_process_info(pid)
- if process_info:
- process_list.append(process_info)
- except:
- return 0
- return process_list
- def check_new_processes(prev_processes, current_processes):
- new_processes = [p for p in current_processes if p not in prev_processes]
- return new_processes
- def print_process(process_info):
- current_time = datetime.now().strftime("%H:%M:%S")
- print(
- f"{current_time} "
- f"{process_info['pid']:<6} "
- f"{process_info['username']:<10} "
- f"{process_info['name']:<30} "
- f"{process_info['cmdline']:<60}"
- )
- def worker_thread():
- global prev_processes
- global stop_threads
- while not stop_threads:
- current_processes = get_current_processes()
- if current_processes == 0:
- pass
- else:
- new_processes = check_new_processes(prev_processes, current_processes)
- if new_processes:
- for new_process in new_processes:
- print_process(new_process)
- prev_processes = current_processes
- time.sleep(0.0001)
- if __name__ == "__main__":
- try:
- print(f"{'TIME':<19} {'PID':<6} {'USER':<10} {'NAME':<30} {'CMDLINE':<60}")
- print("-" * 125)
- global prev_processes
- global stop_threads
- stop_threads = False
- prev_processes = get_current_processes()
- for process in prev_processes:
- print_process(process)
- worker = threading.Thread(target=worker_thread)
- worker.start()
- except KeyboardInterrupt:
- print("\nProcess monitor stopped.")
- stop_threads = True
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement