Skip to content

Custom Reward Function using "information-theoretic measure of temporal dependency" (Varley, 2023) #61

Open
@kennethZhangML

Description

@kennethZhangML
import torch 
import torch.nn as nn 
import torch.nn.functional as F 
import torch.distributions as distributions

import matplotlib.pyplot as plt 
import numpy as np 

import gym
print(gym.__version__)

# Calculate entropy of random variable x
def entropy(x):
    try:
        assert len(x) > 0
        assert len(x) > 0
    except AssertionError:
        raise ValueError("Input array must not be empty.")
    _, counts = np.unique(x, return_counts = True)
    p = counts / len(x)
    return -np.sum(p * np.log2(p))

# Calculate conditional entropy of x given y
def conditional_entropy(x, y):
    try:
        assert len(x) == len(y)
        assert len(x) > 0 
    except AssertionError:
        raise ValueError("Input arrays must have the same length, and must not be empty!")
    n = len(x)
    _, counts = np.unique(y, return_counts = True)
    hy = np.sum(-(counts / n) * np.log2(counts / n))
    hy_x = 0 
    for y_val in np.unique(y):
        x_given_y = x[y == y_val]
        hy_x += (np.sum(y == y_val) / n) * entropy(x_given_y)
    return hy_x, hy

# Calculate mutual information between x and y given condition z
def information_mutual_conditional(x, y, z):
    try:
        assert len(x) == len(y) == len(z)
        assert len(x) > 0
    except AssertionError:
        raise ValueError("Input arrays must have the same length, and must not be empty!")
    hy_xz, hy_z = conditional_entropy(x, z)
    hy_yz, _ = conditional_entropy(y, z)
    hy_z = entropy(z)
    mi = hy_xz + hy_yz - hy_z 
    return mi 

# Calculate mutual information between x and y
def information_mutual(x, y):
    try:
        assert len(x) == len(y)
        assert len(x) > 0
    except AssertionError:
        raise ValueError("Input arrays must have the same length and must not be empty!")
    h_x = entropy(x)
    h_y = entropy(x)
    mi_xy = h_x + h_y - information_mutual_conditional(x, y, np.array([]))
    return mi_xy

class CustomEnv(gym.Env):
    def __init__(self, max_timesteps, n_agents):
        # Define environment params
        self.action_space = gym.spaces.Discrete(2) # binary action space
        self.observation_space = gym.spaces.Box(low = 0, high = 1, 
                                                shape = (3, ), dtype = np.int32)
        self.timestep = 0
        self.max_timesteps = max_timesteps
        self.n_agents = n_agents

        # Initialize the system with random past, present, and future states
        self.past = np.random.randint(0, 2, size = 1)
        self.present = np.random.randint(0, 2, size = 1)
        self.future = np.random.randint(0, 2, size = 1)

    def step(self, actions):
        # update system based on the chosen actions
        self.past = np.random.randint(self.past, self.present, axis = 0)
        self.present = np.append(self.present, self.future, axis = 0)
        self.future = np.append(self.future, actions, axis = 0)
        self.timestep += 1

        # Calculate the Istx measure
        tau = self.present 
        s = self.past 
        x = self.future 
        I_tau_sx = information_mutual_conditional(s, x, tau)
        I_tau_sx_shared = information_mutual(s, x)
        I_tau_sx_excl = I_tau_sx - I_tau_sx_shared

        # Calculate proposed diversity penalty term
        action_counts = np.bincount(actions, minlength = 2)
        action_probabilities = action_counts / len(actions)
        diversity_penalty = -np.sum(action_probabilities * np.log(action_probabilities))

        # Define reward function
        reward = -I_tau_sx_excl + diversity_penalty

        # Determine if the eepisode is done or not
        done = (self.timestep >= self.max_timesteps)
        # return the new state, reward, and done status
        return (self.past[-self.n_agents:], self.present[-self.n_agents:], 
                self.future[-self.n_agents:]), reward, done, {}

    def reset(self):
        # Reset the environment to a new initial state
        self.past  = np.random.randint(0, 2, size = self.n_agents)
        self.present = np.random.randint(0, 2, size = self.n_agents)
        self.future = np.random.randint(0, 2, size = self.n_agents)
        self.timestep = 0
        return (self.past, self.present, self.future)

def main():
    env = CustomEnv(n_agents = 10, max_timesteps = 1000)
    
    INPUT_DIM = env.observation_space.shape[0]
    HIDDEN_DIM = 256
    OUTPUT_DIM = env.action_space.n

    print(INPUT_DIM, HIDDEN_DIM, OUTPUT_DIM)

main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions