Building a (Very Simple) Autonomous Agent and Environment

*Before reading and following along with this post, it might be helpful to go read this post on agents, environments, and Markov decision processes. Also, the code below is found in Maxim Laplan's excellent book, Deep Reinforcement Learning Hands-On, cited fully at the end of the post. His code without my commentary can be found here.

How to build a (very simple) autonomous agent and environment

As a beginning exercise in my ongoing series on implementing Reinforcement Learning models, we'll define a simplsitic environment that gives rewards for a determined number of steps, regardless of the actual actions.

First, a quick review:

Agent: Some piece of code that implements some policy. Given observations, the policy dictates what action the agent takes at each timestep

Environment: The model of the world, external to the agent. Provides observations, gives awards, and changes state based on actions.

Defining our environment

We'll start by initializing the internal state of the environment, which is simply a counter that limits the total number of steps the agent is allowed to take.

In [1]:
class Environment:
  def __init__(self):
    self.steps_left = 10

Let's now define get_observation(), which returns the observation of the current environment to the agent. Normally, it would be implemented as a function of the internal environment state, but in our case the vector is always zero.

In [2]:
def get_observation(self):
  return [0.0, 0.0, 0.0]

Next, we'll define get_actions, a set of actions from which the agent can choose. For this model, there are only two actions:

In [3]:
def get_actions(self):
  return [0, 1]

As our agent takes actions within the environment, it performs series of steps called episodes. We need to define when an episode is over, so the agent knows when there is no longer any way to communicate with the environment:

In [4]:
def is_done(self):
  return self.steps_left == 0

The action() method is perhaps the most important piece of the environment. The action() method both handles the agent's action (thereby changing the environment's state) and returns a reward to the agent for this action. In this case, the reward is random, and the action has no effect on the environment. Additionally, we reduce the steps_left counter by one. Finally, if the steps remaining == 0, then we stop the episodes.

In [5]:
def action(self, action):
  if self.is_done():
    raise Exception("Game is over")
  self.steps_left -= 1
  return random.random()
Defining our agent

We'll begin by initializing our agent and a counter which will be used to store the reward value as it accumulates:

In [6]:
class Agent:
  def __init__(self):
    self.total_reward = 0.0

Now, we'll define a step() function that accepts our environment as an argument, allowing the agent to do a few things:

  • Observe the state of the environment via current_obs
  • Make a decision on which action to perform via actions
  • Pass the action to the environment and receive the reward (reward/env.actions)
  • Add the current reward to the accumulated rewards
In [7]:
def step(self, env):
  current_obs = env.get_observation() # Observe the environment
  actions = env.get_actions() # Get available actions
  reward = env.action(random.choice(actions)) # Perform the action, get a reward
  self.total_reward += reward # Add reward to total
A quick review:

So, we've done quite a bit here, and it may not be lost on anyone following along that the above code snippets don't mean much until they are combined correctly and passed through a proedure to create the classes and run an episode. Before we do that, let's review what we've done:

  • We created classes for both our environment and our agent
  • We defined our observation vector, allowing our agent to see the environment in its current state
  • We defined the action space for our agent in the environment
  • We set the rules for when an episode ends
  • We defined what happens to the environment (nothing, in this case) and what rewards are given when our agent takes an actions
  • We defined what happens at each step in an episode

Whew, a lot for a simple environment! Let's put it all together and run the code:

In [8]:
import random

class Environment:
  def __init__(self):
    self.steps_left = 10
    
  def get_observation(self):
    return [0.0, 0.0, 0.0]

  def get_actions(self):
    return [0, 1]

  def is_done(self):
    return self.steps_left == 0
  
  def action(self, action):
    if self.is_done():
      raise Exception("Game is over")
    self.steps_left -= 1
    return random.random()
  
class Agent:
  def __init__(self):
    self.total_reward = 0.0
    
  def step(self, env):
    current_obs = env.get_observation() # Observe the environment
    actions = env.get_actions() # Get available actions
    reward = env.action(random.choice(actions)) # Perform action,get reward
    self.total_reward += reward # Add reward to total
  
if __name__ == "__main__":
  env = Environment()
  agent = Agent()
  
  while not env.is_done():
    agent.step(env)
    
  print("Total reward collected: %.4f" % agent.total_reward)
Total reward collected: 3.8136

Sources

Lapan, M. (2018). Deep reinforcement learning hands-on: Apply modern RL methods, with deep Q-networks, value iteration, policy gradients, TRPO, AlphaGo Zero and more.

Comments