Open
Description
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.distributions as distributions
import matplotlib.pyplot as plt
import numpy as np
import gym
print(gym.__version__)
# Calculate entropy of random variable x
def entropy(x):
try:
assert len(x) > 0
assert len(x) > 0
except AssertionError:
raise ValueError("Input array must not be empty.")
_, counts = np.unique(x, return_counts = True)
p = counts / len(x)
return -np.sum(p * np.log2(p))
# Calculate conditional entropy of x given y
def conditional_entropy(x, y):
try:
assert len(x) == len(y)
assert len(x) > 0
except AssertionError:
raise ValueError("Input arrays must have the same length, and must not be empty!")
n = len(x)
_, counts = np.unique(y, return_counts = True)
hy = np.sum(-(counts / n) * np.log2(counts / n))
hy_x = 0
for y_val in np.unique(y):
x_given_y = x[y == y_val]
hy_x += (np.sum(y == y_val) / n) * entropy(x_given_y)
return hy_x, hy
# Calculate mutual information between x and y given condition z
def information_mutual_conditional(x, y, z):
try:
assert len(x) == len(y) == len(z)
assert len(x) > 0
except AssertionError:
raise ValueError("Input arrays must have the same length, and must not be empty!")
hy_xz, hy_z = conditional_entropy(x, z)
hy_yz, _ = conditional_entropy(y, z)
hy_z = entropy(z)
mi = hy_xz + hy_yz - hy_z
return mi
# Calculate mutual information between x and y
def information_mutual(x, y):
try:
assert len(x) == len(y)
assert len(x) > 0
except AssertionError:
raise ValueError("Input arrays must have the same length and must not be empty!")
h_x = entropy(x)
h_y = entropy(x)
mi_xy = h_x + h_y - information_mutual_conditional(x, y, np.array([]))
return mi_xy
class CustomEnv(gym.Env):
def __init__(self, max_timesteps, n_agents):
# Define environment params
self.action_space = gym.spaces.Discrete(2) # binary action space
self.observation_space = gym.spaces.Box(low = 0, high = 1,
shape = (3, ), dtype = np.int32)
self.timestep = 0
self.max_timesteps = max_timesteps
self.n_agents = n_agents
# Initialize the system with random past, present, and future states
self.past = np.random.randint(0, 2, size = 1)
self.present = np.random.randint(0, 2, size = 1)
self.future = np.random.randint(0, 2, size = 1)
def step(self, actions):
# update system based on the chosen actions
self.past = np.random.randint(self.past, self.present, axis = 0)
self.present = np.append(self.present, self.future, axis = 0)
self.future = np.append(self.future, actions, axis = 0)
self.timestep += 1
# Calculate the Istx measure
tau = self.present
s = self.past
x = self.future
I_tau_sx = information_mutual_conditional(s, x, tau)
I_tau_sx_shared = information_mutual(s, x)
I_tau_sx_excl = I_tau_sx - I_tau_sx_shared
# Calculate proposed diversity penalty term
action_counts = np.bincount(actions, minlength = 2)
action_probabilities = action_counts / len(actions)
diversity_penalty = -np.sum(action_probabilities * np.log(action_probabilities))
# Define reward function
reward = -I_tau_sx_excl + diversity_penalty
# Determine if the eepisode is done or not
done = (self.timestep >= self.max_timesteps)
# return the new state, reward, and done status
return (self.past[-self.n_agents:], self.present[-self.n_agents:],
self.future[-self.n_agents:]), reward, done, {}
def reset(self):
# Reset the environment to a new initial state
self.past = np.random.randint(0, 2, size = self.n_agents)
self.present = np.random.randint(0, 2, size = self.n_agents)
self.future = np.random.randint(0, 2, size = self.n_agents)
self.timestep = 0
return (self.past, self.present, self.future)
def main():
env = CustomEnv(n_agents = 10, max_timesteps = 1000)
INPUT_DIM = env.observation_space.shape[0]
HIDDEN_DIM = 256
OUTPUT_DIM = env.action_space.n
print(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM)
main()
Metadata
Metadata
Assignees
Labels
No labels