您最多选择25个主题
主题必须以中文或者字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
127 行
4.8 KiB
127 行
4.8 KiB
from mlagents_envs.side_channel import SideChannel, OutgoingMessage, IncomingMessage
|
|
from mlagents_envs.exception import (
|
|
UnityCommunicationException,
|
|
UnitySideChannelException,
|
|
)
|
|
import uuid
|
|
from typing import NamedTuple, Optional
|
|
from enum import IntEnum
|
|
|
|
|
|
class EngineConfig(NamedTuple):
|
|
width: Optional[int]
|
|
height: Optional[int]
|
|
quality_level: Optional[int]
|
|
time_scale: Optional[float]
|
|
target_frame_rate: Optional[int]
|
|
capture_frame_rate: Optional[int]
|
|
|
|
@staticmethod
|
|
def default_config():
|
|
return EngineConfig(80, 80, 1, 20.0, -1, 60)
|
|
|
|
|
|
class EngineConfigurationChannel(SideChannel):
|
|
"""
|
|
This is the SideChannel for engine configuration exchange. The data in the
|
|
engine configuration is as follows :
|
|
- int width;
|
|
- int height;
|
|
- int qualityLevel;
|
|
- float timeScale;
|
|
- int targetFrameRate;
|
|
- int captureFrameRate;
|
|
"""
|
|
|
|
class ConfigurationType(IntEnum):
|
|
SCREEN_RESOLUTION = 0
|
|
QUALITY_LEVEL = 1
|
|
TIME_SCALE = 2
|
|
TARGET_FRAME_RATE = 3
|
|
CAPTURE_FRAME_RATE = 4
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__(uuid.UUID("e951342c-4f7e-11ea-b238-784f4387d1f7"))
|
|
|
|
def on_message_received(self, msg: IncomingMessage) -> None:
|
|
"""
|
|
Is called by the environment to the side channel. Can be called
|
|
multiple times per step if multiple messages are meant for that
|
|
SideChannel.
|
|
Note that Python should never receive an engine configuration from
|
|
Unity
|
|
"""
|
|
raise UnityCommunicationException(
|
|
"The EngineConfigurationChannel received a message from Unity, "
|
|
+ "this should not have happened."
|
|
)
|
|
|
|
def set_configuration_parameters(
|
|
self,
|
|
width: Optional[int] = None,
|
|
height: Optional[int] = None,
|
|
quality_level: Optional[int] = None,
|
|
time_scale: Optional[float] = None,
|
|
target_frame_rate: Optional[int] = None,
|
|
capture_frame_rate: Optional[int] = None,
|
|
) -> None:
|
|
"""
|
|
Sets the engine configuration. Takes as input the configurations of the
|
|
engine.
|
|
:param width: Defines the width of the display. (Must be set alongside height)
|
|
:param height: Defines the height of the display. (Must be set alongside width)
|
|
:param quality_level: Defines the quality level of the simulation.
|
|
:param time_scale: Defines the multiplier for the deltatime in the
|
|
simulation. If set to a higher value, time will pass faster in the
|
|
simulation but the physics might break.
|
|
:param target_frame_rate: Instructs simulation to try to render at a
|
|
specified frame rate.
|
|
:param capture_frame_rate: Instructs the simulation to consider time between
|
|
updates to always be constant, regardless of the actual frame rate.
|
|
"""
|
|
|
|
if (width is None and height is not None) or (
|
|
width is not None and height is None
|
|
):
|
|
raise UnitySideChannelException(
|
|
"You cannot set the width/height of the screen resolution without also setting the height/width"
|
|
)
|
|
|
|
if width is not None and height is not None:
|
|
screen_msg = OutgoingMessage()
|
|
screen_msg.write_int32(self.ConfigurationType.SCREEN_RESOLUTION)
|
|
screen_msg.write_int32(width)
|
|
screen_msg.write_int32(height)
|
|
super().queue_message_to_send(screen_msg)
|
|
|
|
if quality_level is not None:
|
|
quality_level_msg = OutgoingMessage()
|
|
quality_level_msg.write_int32(self.ConfigurationType.QUALITY_LEVEL)
|
|
quality_level_msg.write_int32(quality_level)
|
|
super().queue_message_to_send(quality_level_msg)
|
|
|
|
if time_scale is not None:
|
|
time_scale_msg = OutgoingMessage()
|
|
time_scale_msg.write_int32(self.ConfigurationType.TIME_SCALE)
|
|
time_scale_msg.write_float32(time_scale)
|
|
super().queue_message_to_send(time_scale_msg)
|
|
|
|
if target_frame_rate is not None:
|
|
target_frame_rate_msg = OutgoingMessage()
|
|
target_frame_rate_msg.write_int32(self.ConfigurationType.TARGET_FRAME_RATE)
|
|
target_frame_rate_msg.write_int32(target_frame_rate)
|
|
super().queue_message_to_send(target_frame_rate_msg)
|
|
|
|
if capture_frame_rate is not None:
|
|
capture_frame_rate_msg = OutgoingMessage()
|
|
capture_frame_rate_msg.write_int32(
|
|
self.ConfigurationType.CAPTURE_FRAME_RATE
|
|
)
|
|
capture_frame_rate_msg.write_int32(capture_frame_rate)
|
|
super().queue_message_to_send(capture_frame_rate_msg)
|
|
|
|
def set_configuration(self, config: EngineConfig) -> None:
|
|
"""
|
|
Sets the engine configuration. Takes as input an EngineConfig.
|
|
"""
|
|
self.set_configuration_parameters(**config._asdict())
|