- 积分
- 8481
- 回帖
- 0
- 西莫币
-
- 贡献
-
- 威望
-
- 存款
-
- 阅读权限
- 70
- 最后登录
- 1970-1-1
签到天数: 21 天 连续签到: 2 天 [LV.4]偶尔看看III
|
楼主 |
发表于 2024-7-3 08:33
|
显示全部楼层
然后,我们构建D3QN算法( DuelingDoubleDeepQNetwork ),并由此构建一个agent(Agent):
==================== 我仍然是一个华丽的分割线 ==============================
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow.keras as keras
from tensorflow.python.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from replay_buffer import ReplayBuffer
class DuelingDoubleDeepQNetwork(keras.Model):
def __init__(self, num_actions, fc1, fc2):
super(DuelingDoubleDeepQNetwork, self).__init__()
self.dense1 = Dense(fc1, activation='relu')
self.dense2 = Dense(fc2, activation='relu')
self.V = Dense(1, activation=None)
self.A = Dense(num_actions, activation=None)
def call(self, state):
x = self.dense1(state)
x = self.dense2(x)
V = self.V(x)
A = self.A(x)
avg_A = tf.math.reduce_mean(A, axis=1, keepdims=True)
Q = (V + (A - avg_A))
return Q, A
class Agent:
def __init__(self, lr, discount_factor, num_actions, epsilon, batch_size, input_dim):
self.action_space = [i for i in range(num_actions)]
self.discount_factor = discount_factor
self.epsilon = epsilon
self.batch_size = batch_size
self.epsilon_decay = 0.001
self.epsilon_final = 0.01
self.update_rate = 120
self.step_counter = 0
self.buffer = ReplayBuffer(100000, input_dim)
self.q_net = DuelingDoubleDeepQNetwork(num_actions, 128, 128)
self.q_target_net = DuelingDoubleDeepQNetwork(num_actions, 128, 128)
self.q_net.compile(optimizer=Adam(learning_rate=lr), loss='mse')
self.q_target_net.compile(optimizer=Adam(learning_rate=lr), loss='mse')
def store_tuple(self, state, action, reward, new_state, done):
self.buffer.store_tuples(state, action, reward, new_state, done)
def policy(self, observation):
if np.random.random() < self.epsilon:
action = np.random.choice(self.action_space)
else:
state = np.array([observation])
_, actions = self.q_net(state)
action = tf.math.argmax(actions, axis=1).numpy()[0]
return action
def train(self):
if self.buffer.counter < self.batch_size:
return
if self.step_counter % self.update_rate == 0:
self.q_target_net.set_weights(self.q_net.get_weights())
state_batch, action_batch, reward_batch, new_state_batch, done_batch = \
self.buffer.sample_buffer(self.batch_size)
q_predicted, _ = self.q_net(state_batch)
q_next, _ = self.q_target_net(new_state_batch)
q_target = q_predicted.numpy()
_, actions = self.q_net(new_state_batch)
max_actions = tf.math.argmax(actions, axis=1)
for idx in range(done_batch.shape[0]):
q_target[idx, action_batch[idx]] = reward_batch[idx] + self.discount_factor*q_next[idx, max_actions[idx]] *\
(1-int(done_batch[idx]))
self.q_net.train_on_batch(state_batch, q_target)
self.epsilon = self.epsilon - self.epsilon_decay if self.epsilon > self.epsilon_final else self.epsilon_final
self.step_counter += 1
def train_model(self, env, num_episodes, graph):
scores, episodes, avg_scores, obj = [], [], [], []
goal = 200
f = 0
txt = open("saved_networks.txt", "w")
for i in range(num_episodes):
done = False
score = 0.0
state = env.reset()
while not done:
action = self.policy(state)
new_state, reward, done, _ = env.step(action)
score += reward
self.store_tuple(state, action, reward, new_state, done)
state = new_state
self.train()
scores.append(score)
obj.append(goal)
episodes.append(i)
avg_score = np.mean(scores[-100:])
avg_scores.append(avg_score)
print("Episode {0}/{1}, Score: {2} ({3}), AVG Score: {4}".format(i, num_episodes, score, self.epsilon,
avg_score))
if avg_score >= 200.0 and score >= 250:
self.q_net.save(("saved_networks/d3qn_model{0}".format(f)))
self.q_net.save_weights(("saved_networks/d3qn_model{0}/net_weights{0}.h5".format(f)))
txt.write("Save {0} - Episode {1}/{2}, Score: {3} ({4}), AVG Score: {5}\n".format(f, i, num_episodes,
score, self.epsilon,
avg_score))
f += 1
print("Network saved")
txt.close()
if graph:
df = pd.DataFrame({'x': episodes, 'Score': scores, 'Average Score': avg_scores, 'Solved Requirement': obj})
plt.plot('x', 'Score', data=df, marker='', color='blue', linewidth=2, label='Score')
plt.plot('x', 'Average Score', data=df, marker='', color='orange', linewidth=2, linestyle='dashed',
label='AverageScore')
plt.plot('x', 'Solved Requirement', data=df, marker='', color='red', linewidth=2, linestyle='dashed',
label='Solved Requirement')
plt.legend()
plt.savefig('LunarLander_Train.png')
def test(self, env, num_episodes, file_type, file, graph):
if file_type == 'tf':
self.q_net = tf.keras.models.load_model(file)
elif file_type == 'h5':
self.train_model(env, 5, False)
self.q_net.load_weights(file)
self.epsilon = 0.0
scores, episodes, avg_scores, obj = [], [], [], []
goal = 200
score = 0.0
for i in range(num_episodes):
state = env.reset()
done = False
episode_score = 0.0
while not done:
env.render()
action = self.policy(state)
new_state, reward, done, _ = env.step(action)
episode_score += reward
state = new_state
score += episode_score
scores.append(episode_score)
obj.append(goal)
episodes.append(i)
avg_score = np.mean(scores[-100:])
avg_scores.append(avg_score)
if graph:
df = pd.DataFrame({'x': episodes, 'Score': scores, 'Average Score': avg_scores, 'Solved Requirement': obj})
plt.plot('x', 'Score', data=df, marker='', color='blue', linewidth=2, label='Score')
plt.plot('x', 'Average Score', data=df, marker='', color='orange', linewidth=2, linestyle='dashed',
label='AverageScore')
plt.plot('x', 'Solved Requirement', data=df, marker='', color='red', linewidth=2, linestyle='dashed',
label='Solved Requirement')
plt.legend()
plt.savefig('LunarLander_Test.png')
env.close() |
|