# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
from __future__ import annotations
import importlib
from dataclasses import dataclass, MISSING
from math import prod
from typing import Optional, Type
import torch
from tensordict import TensorDictBase
from torch import nn, Tensor
from benchmarl.models.common import Model, ModelConfig
_has_torch_geometric = importlib.util.find_spec("torch_geometric") is not None
if _has_torch_geometric:
import torch_geometric
TOPOLOGY_TYPES = {"full", "empty"}
def _get_edge_index(topology: str, self_loops: bool, n_agents: int, device: str):
if topology == "full":
adjacency = torch.ones(n_agents, n_agents, device=device, dtype=torch.long)
elif topology == "empty":
adjacency = torch.ones(n_agents, n_agents, device=device, dtype=torch.long)
edge_index, _ = torch_geometric.utils.dense_to_sparse(adjacency)
if self_loops:
edge_index, _ = torch_geometric.utils.add_self_loops(edge_index)
else:
edge_index, _ = torch_geometric.utils.remove_self_loops(edge_index)
return edge_index
[docs]
class Gnn(Model):
"""A GNN model.
GNN models can be used as "decentralized" actors or critics.
Args:
topology (str): Topology of the graph adjacency matrix. Options: "full", "empty".
self_loops (str): Whether the resulting adjacency matrix will have self loops.
gnn_class (Type[torch_geometric.nn.MessagePassing]): the gnn convolution class to use
gnn_kwargs (dict, optional): the dict of arguments to pass to the gnn conv class
Examples:
.. code-block:: python
import torch_geometric
from torch import nn
from benchmarl.algorithms import IppoConfig
from benchmarl.environments import VmasTask
from benchmarl.experiment import Experiment, ExperimentConfig
from benchmarl.models import SequenceModelConfig, GnnConfig, MlpConfig
experiment = Experiment(
algorithm_config=IppoConfig.get_from_yaml(),
model_config=GnnConfig(
topology="full",
self_loops=False,
gnn_class=torch_geometric.nn.conv.GATv2Conv,
gnn_kwargs={},
),
critic_model_config=SequenceModelConfig(
model_configs=[
MlpConfig(num_cells=[8], activation_class=nn.Tanh, layer_class=nn.Linear),
GnnConfig(
topology="full",
self_loops=False,
gnn_class=torch_geometric.nn.conv.GraphConv,
),
MlpConfig(num_cells=[6], activation_class=nn.Tanh, layer_class=nn.Linear),
],
intermediate_sizes=[5,3],
),
seed=0,
config=ExperimentConfig.get_from_yaml(),
task=VmasTask.NAVIGATION.get_from_yaml(),
)
experiment.run()
"""
def __init__(
self,
topology: str,
self_loops: bool,
gnn_class: Type[torch_geometric.nn.MessagePassing],
gnn_kwargs: Optional[dict] = None,
**kwargs,
):
self.topology = topology
self.self_loops = self_loops
super().__init__(**kwargs)
self.input_features = self.input_leaf_spec.shape[-1]
self.output_features = self.output_leaf_spec.shape[-1]
if gnn_kwargs is None:
gnn_kwargs = {}
gnn_kwargs.update(
{"in_channels": self.input_features, "out_channels": self.output_features}
)
self.gnns = nn.ModuleList(
[
gnn_class(**gnn_kwargs).to(self.device)
for _ in range(self.n_agents if not self.share_params else 1)
]
)
self.edge_index = _get_edge_index(
topology=self.topology,
self_loops=self.self_loops,
device=self.device,
n_agents=self.n_agents,
)
def _perform_checks(self):
super()._perform_checks()
if self.topology not in TOPOLOGY_TYPES:
raise ValueError(
f"Got topology: {self.topology} but only available options are {TOPOLOGY_TYPES}"
)
if self.centralised:
raise ValueError("GNN model can only be used in non-centralised critics")
if not self.input_has_agent_dim:
raise ValueError(
"The GNN module is not compatible with input that does not have the agent dimension,"
"such as the global state in centralised critics. Please choose another critic model"
"if your algorithm has a centralized critic and the task has a global state."
)
if self.input_leaf_spec.shape[-2] != self.n_agents:
raise ValueError(
"The second to last input spec dimension should be the number of agents"
)
if (
self.output_has_agent_dim
and self.output_leaf_spec.shape[-2] != self.n_agents
):
raise ValueError(
"If the GNN output has the agent dimension,"
" the second to last spec dimension should be the number of agents"
)
[docs]
def _forward(self, tensordict: TensorDictBase) -> TensorDictBase:
# Gather in_key
input = tensordict.get(self.in_key)
batch_size = input.shape[:-2]
graph = batch_from_dense_to_ptg(x=input, edge_index=self.edge_index)
if not self.share_params:
res = torch.stack(
[
gnn(graph.x, graph.edge_index).view(
*batch_size,
self.n_agents,
self.output_features,
)[:, i]
for i, gnn in enumerate(self.gnns)
],
dim=-2,
)
else:
res = self.gnns[0](
graph.x,
graph.edge_index,
).view(*batch_size, self.n_agents, self.output_features)
tensordict.set(self.out_key, res)
return tensordict
# class GnnKernel(nn.Module):
# def __init__(self, in_dim, out_dim, **cfg):
# super().__init__()
#
# gnn_types = {"GraphConv", "GATv2Conv", "GINEConv"}
# aggr_types = {"add", "mean", "max"}
#
# self.aggr = "add"
# self.gnn_type = "GraphConv"
#
# self.in_dim = in_dim
# self.out_dim = out_dim
# self.activation_fn = nn.Tanh
#
# if self.gnn_type == "GraphConv":
# self.gnn = GraphConv(
# self.in_dim,
# self.out_dim,
# aggr=self.aggr,
# )
# elif self.gnn_type == "GATv2Conv":
# # Default adds self loops
# self.gnn = GATv2Conv(
# self.in_dim,
# self.out_dim,
# edge_dim=self.edge_features,
# fill_value=0.0,
# share_weights=True,
# add_self_loops=True,
# aggr=self.aggr,
# )
# elif self.gnn_type == "GINEConv":
# self.gnn = GINEConv(
# nn=nn.Sequential(
# torch.nn.Linear(self.in_dim, self.out_dim),
# self.activation_fn(),
# ),
# edge_dim=self.edge_features,
# aggr=self.aggr,
# )
#
# def forward(self, x, edge_index):
# out = self.gnn(x, edge_index)
# return out
def batch_from_dense_to_ptg(
x: Tensor,
edge_index: Tensor,
) -> torch_geometric.data.Batch:
batch_size = prod(x.shape[:-2])
n_agents = x.shape[-2]
x = x.view(-1, x.shape[-1])
b = torch.arange(batch_size, device=x.device)
graphs = torch_geometric.data.Batch()
graphs.ptr = torch.arange(0, (batch_size + 1) * n_agents, n_agents)
graphs.batch = torch.repeat_interleave(b, n_agents)
graphs.x = x
graphs.edge_attr = None
n_edges = edge_index.shape[1]
# Tensor of shape [batch_size * n_edges]
# in which edges corresponding to the same graph have the same index.
batch = torch.repeat_interleave(b, n_edges)
# Edge index for the batched graphs of shape [2, n_edges * batch_size]
# we sum to each batch an offset of batch_num * n_agents to make sure that
# the adjacency matrices remain independent
batch_edge_index = edge_index.repeat(1, batch_size) + batch * n_agents
graphs.edge_index = batch_edge_index
graphs = graphs.to(x.device)
return graphs
[docs]
@dataclass
class GnnConfig(ModelConfig):
"""Dataclass config for a :class:`~benchmarl.models.Gnn`."""
topology: str = MISSING
self_loops: bool = MISSING
gnn_class: Type[torch_geometric.nn.MessagePassing] = MISSING
gnn_kwargs: Optional[dict] = None
[docs]
@staticmethod
def associated_class():
return Gnn