Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # Import necessary libraries
- import gym
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import numpy as np
- import cv2
- import random
- import os
- # Config class
- class Config:
- num_episodes = 1800
- max_steps_per_episode = 1000
- buffer_capacity = 50000
- batch_size = 32
- gamma = 0.99
- lr = 1e-4
- epsilon_start = 1.0
- epsilon_min = 0.01
- epsilon_decay = 0.995
- target_update_freq = 2000
- save_path = "/content/drive/MyDrive/checkpoints"
- # DQN model
- class DQN(nn.Module):
- def __init__(self, input_channels, n_actions):
- super(DQN, self).__init__()
- self.conv = nn.Sequential(
- nn.Conv2d(input_channels, 32, kernel_size=8, stride=4), nn.ReLU(),
- nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(),
- nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU()
- )
- self.fc = nn.Sequential(
- nn.Linear(64 * 7 * 7, 512), nn.ReLU(),
- nn.Linear(512, n_actions)
- )
- def forward(self, x):
- x = x / 255.0 # Normalize
- x = self.conv(x)
- x = x.view(x.size(0), -1)
- return self.fc(x)
- # Replay buffer
- class ReplayBuffer:
- def __init__(self, capacity):
- self.buffer = []
- self.capacity = capacity
- def push(self, *experience):
- if len(self.buffer) >= self.capacity:
- self.buffer.pop(0)
- self.buffer.append(experience)
- def sample(self, batch_size):
- samples = random.sample(self.buffer, batch_size)
- return map(np.array, zip(*samples))
- def __len__(self):
- return len(self.buffer)
- # Frame Preprocessing
- def preprocess_frame(frame):
- gray = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY)
- resized = cv2.resize(gray, (84, 84), interpolation=cv2.INTER_AREA)
- return resized
- # Frame Stacking
- def stack_frames(frames, frame, is_new_episode=False):
- if is_new_episode:
- frames = [frame] * 4
- else:
- frames = frames[1:] + [frame]
- return np.stack(frames, axis=0), frames
- # Action Selection
- def select_action(state, policy_net, epsilon, n_actions, device):
- if np.random.rand() < epsilon:
- return np.random.randint(n_actions)
- else:
- state = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)
- with torch.no_grad():
- return policy_net(state).argmax().item()
- # Save Model
- def save_model(policy_net, filename):
- os.makedirs(Config.save_path, exist_ok=True)
- torch.save(policy_net.state_dict(), filename)
- # Load Model
- def load_model(policy_net, filename):
- if os.path.exists(filename):
- policy_net.load_state_dict(torch.load(filename))
- print(f"Model loaded from {filename}")
- # Training Function
- def train():
- env = gym.make("PongNoFrameskip-v4")
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- n_actions = env.action_space.n
- policy_net = DQN(4, n_actions).to(device)
- target_net = DQN(4, n_actions).to(device)
- target_net.load_state_dict(policy_net.state_dict())
- target_net.eval()
- optimizer = optim.Adam(policy_net.parameters(), lr=Config.lr)
- scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10000, gamma=0.9)
- buffer = ReplayBuffer(Config.buffer_capacity)
- epsilon = Config.epsilon_start
- for episode in range(Config.num_episodes):
- state = preprocess_frame(env.reset())
- frames = [state] * 4
- state, frames = stack_frames([], state, is_new_episode=True)
- total_reward, done = 0, False
- for _ in range(Config.max_steps_per_episode):
- action = select_action(state, policy_net, epsilon, n_actions, device)
- next_frame, reward, done, *_ = env.step(action)
- next_frame = preprocess_frame(next_frame)
- next_state, frames = stack_frames(frames, next_frame, is_new_episode=False)
- buffer.push(state, action, reward, next_state, done)
- state = next_state
- total_reward += reward
- if len(buffer) >= Config.batch_size:
- states, actions, rewards, next_states, dones = buffer.sample(Config.batch_size)
- states = torch.tensor(states, dtype=torch.float32).to(device)
- actions = torch.tensor(actions, dtype=torch.long).to(device)
- rewards = torch.tensor(rewards, dtype=torch.float32).to(device)
- next_states = torch.tensor(next_states, dtype=torch.float32).to(device)
- dones = torch.tensor(dones, dtype=torch.float32).to(device)
- q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)
- next_q_values = target_net(next_states).max(1)[0]
- targets = rewards + Config.gamma * next_q_values * (1 - dones)
- loss = nn.SmoothL1Loss()(q_values, targets)
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
- scheduler.step()
- if done:
- break
- epsilon = max(Config.epsilon_min, epsilon * Config.epsilon_decay)
- if episode % Config.target_update_freq == 0:
- target_net.load_state_dict(policy_net.state_dict())
- if episode % 100 == 0:
- save_model(policy_net, f"{Config.save_path}/model_episode_{episode}.pth")
- print(f"Episode {episode}, Total Reward: {total_reward}")
- save_model(policy_net, f"{Config.save_path}/final_model.pth")
- print("Training complete. Model saved.")
- env.close()
- # Testing Function
- def test():
- env = gym.make("PongNoFrameskip-v4")
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- n_actions = env.action_space.n
- policy_net = DQN(4, n_actions).to(device)
- load_model(policy_net, f"{Config.save_path}/final_model.pth")
- policy_net.eval()
- state = preprocess_frame(env.reset())
- frames = [state] * 4
- state, frames = stack_frames([], state, is_new_episode=True)
- total_reward, done = 0, False
- while not done:
- env.render()
- action = select_action(state, policy_net, 0, n_actions, device)
- next_frame, reward, done, *_ = env.step(action)
- next_frame = preprocess_frame(next_frame)
- state, frames = stack_frames(frames, next_frame, is_new_episode=False)
- total_reward += reward
- print(f"Test complete. Total Reward: {total_reward}")
- env.close()
- # Main Execution
- if __name__ == "__main__":
- train()
- test()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement