Source code for sc2.client

from __future__ import annotations

from collections.abc import Iterable
from pathlib import Path
from typing import Any, Literal

from aiohttp import ClientWebSocketResponse
from loguru import logger

from s2clientprotocol import debug_pb2 as debug_pb
from s2clientprotocol import query_pb2 as query_pb
from s2clientprotocol import raw_pb2 as raw_pb
from s2clientprotocol import sc2api_pb2 as sc_pb
from s2clientprotocol import spatial_pb2 as spatial_pb
from sc2.action import combine_actions
from sc2.data import ActionResult, ChatChannel, Race, Result, Status
from sc2.game_data import AbilityData, GameData
from sc2.game_info import GameInfo
from sc2.ids.ability_id import AbilityId
from sc2.ids.unit_typeid import UnitTypeId
from sc2.portconfig import Portconfig
from sc2.position import Point2, Point3
from sc2.protocol import ConnectionAlreadyClosedError, Protocol, ProtocolError
from sc2.renderer import Renderer
from sc2.unit import Unit
from sc2.unit_command import UnitCommand
from sc2.units import Units


[docs] class Client(Protocol): def __init__(self, ws: ClientWebSocketResponse, save_replay_path: str | None = None) -> None: """ :param ws: """ super().__init__(ws) # How many frames will be waited between iterations before the next one is called self.game_step: int = 4 self.save_replay_path: str | None = save_replay_path # The following will be set on join_game() self._player_id: int = None # pyrefly: ignore # The following will be set on leave() self._game_result: dict[int, Result] = None # pyrefly: ignore # Store a hash value of all the debug requests to prevent sending the same ones again if they haven't changed last frame self._debug_hash_tuple_last_iteration: tuple[int, int, int, int] = (0, 0, 0, 0) self._debug_draw_last_frame = False self._debug_texts = [] self._debug_lines = [] self._debug_boxes = [] self._debug_spheres = [] self._renderer = None self.raw_affects_selection = False @property def in_game(self) -> bool: return self._status in {Status.in_game, Status.in_replay} async def join_game( self, name: str | None = None, race: Race | None = None, observed_player_id: int | None = None, portconfig: Portconfig | None = None, rgb_render_config: dict[str, Any] | None = None, ): ifopts = sc_pb.InterfaceOptions( raw=True, score=True, show_cloaked=True, show_burrowed_shadows=True, raw_affects_selection=self.raw_affects_selection, raw_crop_to_playable_area=False, show_placeholders=True, ) if rgb_render_config: assert isinstance(rgb_render_config, dict) assert "window_size" in rgb_render_config and "minimap_size" in rgb_render_config window_size = rgb_render_config["window_size"] minimap_size = rgb_render_config["minimap_size"] self._renderer = Renderer(self, window_size, minimap_size) map_width, map_height = window_size minimap_width, minimap_height = minimap_size ifopts.render.resolution.x = map_width ifopts.render.resolution.y = map_height ifopts.render.minimap_resolution.x = minimap_width ifopts.render.minimap_resolution.y = minimap_height if race is None: assert isinstance(observed_player_id, int), f"observed_player_id is of type {type(observed_player_id)}" # join as observer request = sc_pb.RequestJoinGame(observed_player_id=observed_player_id, options=ifopts) else: assert isinstance(race, Race) request = sc_pb.RequestJoinGame(race=race.value, options=ifopts) # pyrefly: ignore[bad-argument-type] if portconfig: request.server_ports.game_port = portconfig.server[0] request.server_ports.base_port = portconfig.server[1] for ppc in portconfig.players: p = request.client_ports.add() # pyrefly: ignore p.game_port = ppc[0] p.base_port = ppc[1] if name is not None: assert isinstance(name, str), f"name is of type {type(name)}" request.player_name = name result = await self._execute(join_game=request) self._game_result = None # pyrefly: ignore self._player_id = result.join_game.player_id return result.join_game.player_id
[docs] async def leave(self) -> None: """You can use 'await self.client.leave()' to surrender midst game.""" is_resign = self._game_result is None if is_resign: # For all clients that can leave, result of leaving the game either # loss, or the client will ignore the result self._game_result = {self._player_id: Result.Defeat} try: if self.save_replay_path is not None: await self.save_replay(self.save_replay_path) self.save_replay_path = None await self._execute(leave_game=sc_pb.RequestLeaveGame()) except (ProtocolError, ConnectionAlreadyClosedError): if is_resign: raise
async def save_replay(self, path: str) -> None: logger.debug("Requesting replay from server") result = await self._execute(save_replay=sc_pb.RequestSaveReplay()) with Path(path).open("wb") as f: f.write(result.save_replay.data) logger.info(f"Saved replay to {path}") async def observation(self, game_loop: int | None = None): if game_loop is not None: result = await self._execute(observation=sc_pb.RequestObservation(game_loop=game_loop)) else: result = await self._execute(observation=sc_pb.RequestObservation()) assert result.HasField("observation") if not self.in_game or result.observation.player_result: # Sometimes game ends one step before results are available if not result.observation.player_result: result = await self._execute(observation=sc_pb.RequestObservation()) assert result.observation.player_result player_id_to_result = dict[int, Result]() for pr in result.observation.player_result: player_id_to_result[pr.player_id] = Result(pr.result) self._game_result = player_id_to_result # if render_data is available, then RGB rendering was requested if self._renderer and result.observation.observation.HasField("render_data"): await self._renderer.render(result.observation) return result
[docs] async def step(self, step_size: int | None = None): """EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second""" step_size = step_size or self.game_step return await self._execute(step=sc_pb.RequestStep(count=step_size))
async def get_game_data(self) -> GameData: result: sc_pb.Response = await self._execute( data=sc_pb.RequestData(ability_id=True, unit_type_id=True, upgrade_id=True, buff_id=True, effect_id=True) ) return GameData(result.data)
[docs] async def dump_data( self, ability_id: bool = True, unit_type_id: bool = True, upgrade_id: bool = True, buff_id: bool = True, effect_id: bool = True, ) -> None: """ Dump the game data files choose what data to dump in the keywords this function writes to a text file call it one time in on_step with: await self._client.dump_data() """ result = await self._execute( data=sc_pb.RequestData( ability_id=ability_id, unit_type_id=unit_type_id, upgrade_id=upgrade_id, buff_id=buff_id, effect_id=effect_id, ) ) with Path("data_dump.txt").open("a") as file: file.write(str(result.data))
async def get_game_info(self) -> GameInfo: result = await self._execute(game_info=sc_pb.RequestGameInfo()) return GameInfo(result.game_info) async def actions(self, actions: list[UnitCommand], return_successes: bool = False) -> list[ActionResult]: if not actions: return [] if not isinstance(actions, list): actions = [actions] # On realtime=True, might get an error here: sc2.protocol.ProtocolError: ['Not in a game'] try: response = await self._execute( action=sc_pb.RequestAction( # pyrefly: ignore actions=(sc_pb.Action(action_raw=action) for action in combine_actions(actions)) ) ) except ProtocolError: return [] if return_successes: return [ActionResult(result) for result in response.action.result] return [ ActionResult(result) for result in response.action.result if ActionResult(result) != ActionResult.Success ]
[docs] async def query_pathing(self, start: Unit | Point2 | Point3, end: Point2 | Point3) -> float | None: """Caution: returns "None" when path not found Try to combine queries with the function below because the pathing query is generally slow. :param start: :param end:""" assert isinstance(start, (Point2, Unit)) assert isinstance(end, Point2) if isinstance(start, Point2): path = [query_pb.RequestQueryPathing(start_pos=start.as_Point2D, end_pos=end.as_Point2D)] else: path = [query_pb.RequestQueryPathing(unit_tag=start.tag, end_pos=end.as_Point2D)] result = await self._execute(query=query_pb.RequestQuery(pathing=path)) distance = float(result.query.pathing[0].distance) if distance <= 0.0: return None return distance
[docs] async def query_pathings(self, zipped_list: list[tuple[Unit | Point2 | Point3, Point2 | Point3]]) -> list[float]: """Usage: await self.query_pathings([[unit1, target2], [unit2, target2]]) -> returns [distance1, distance2] Caution: returns 0 when path not found :param zipped_list: """ assert zipped_list, "No entry in zipped_list" path = ( query_pb.RequestQueryPathing( # pyrefly: ignore unit_tag=p1.tag if isinstance(p1, Unit) else None, # pyrefly: ignore start_pos=None if isinstance(p1, Unit) else p1.as_Point2D, end_pos=p2.as_Point2D, ) for p1, p2 in zipped_list ) # pyrefly: ignore results = await self._execute(query=query_pb.RequestQuery(pathing=path)) return [float(d.distance) for d in results.query.pathing]
async def _query_building_placement_fast( self, ability: AbilityId, positions: list[Point2 | Point3], ignore_resources: bool = True ) -> list[bool]: """ Returns a list of booleans. Return True for positions that are valid, False otherwise. :param ability: :param positions: :param ignore_resources: """ result = await self._execute( query=query_pb.RequestQuery( # pyrefly: ignore placements=( query_pb.RequestQueryBuildingPlacement(ability_id=ability.value, target_pos=position.as_Point2D) for position in positions ), ignore_resource_requirements=ignore_resources, ) ) # Success enum value is 1, see https://github.com/Blizzard/s2client-proto/blob/9906df71d6909511907d8419b33acc1a3bd51ec0/s2clientprotocol/error.proto#L7 return [p.result == 1 for p in result.query.placements]
[docs] async def query_building_placement( self, ability: AbilityData, positions: list[Point2 | Point3], ignore_resources: bool = True, # pyre-fixme[11] ) -> list[ActionResult]: """This function might be deleted in favor of the function above (_query_building_placement_fast). :param ability: :param positions: :param ignore_resources:""" assert isinstance(ability, AbilityData) result = await self._execute( query=query_pb.RequestQuery( # pyrefly: ignore placements=( query_pb.RequestQueryBuildingPlacement(ability_id=ability.id.value, target_pos=position.as_Point2D) for position in positions ), ignore_resource_requirements=ignore_resources, ) ) # Unnecessary converting to ActionResult? return [ActionResult(p.result) for p in result.query.placements]
[docs] async def query_available_abilities( self, units: list[Unit] | Units, ignore_resource_requirements: bool = False ) -> list[list[AbilityId]]: """Query abilities of multiple units""" input_was_a_list = True if not isinstance(units, list): """ Deprecated, accepting a single unit may be removed in the future, query a list of units instead """ assert isinstance(units, Unit) units = [units] input_was_a_list = False assert units result = await self._execute( query=query_pb.RequestQuery( # pyrefly: ignore abilities=(query_pb.RequestQueryAvailableAbilities(unit_tag=unit.tag) for unit in units), ignore_resource_requirements=ignore_resource_requirements, ) ) """ Fix for bots that only query a single unit, may be removed soon """ if not input_was_a_list: # pyrefly: ignore return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities][0] return [[AbilityId(a.ability_id) for a in b.abilities] for b in result.query.abilities]
[docs] async def query_available_abilities_with_tag( self, units: list[Unit] | Units, ignore_resource_requirements: bool = False ) -> dict[int, set[AbilityId]]: """Query abilities of multiple units""" result = await self._execute( query=query_pb.RequestQuery( # pyrefly: ignore abilities=(query_pb.RequestQueryAvailableAbilities(unit_tag=unit.tag) for unit in units), ignore_resource_requirements=ignore_resource_requirements, ) ) return {b.unit_tag: {AbilityId(a.ability_id) for a in b.abilities} for b in result.query.abilities}
[docs] async def chat_send(self, message: str, team_only: bool) -> None: """Writes a message to the chat""" ch = ChatChannel.Team if team_only else ChatChannel.Broadcast await self._execute( action=sc_pb.RequestAction( actions=[ sc_pb.Action( action_chat=sc_pb.ActionChat(channel=ch.value, message=message) # type: ignore[bad-argument-type] ) ] ) )
[docs] async def toggle_autocast(self, units: list[Unit] | Units, ability: AbilityId) -> None: """Toggle autocast of all specified units :param units: :param ability:""" assert units assert isinstance(units, list) assert all(isinstance(u, Unit) for u in units) assert isinstance(ability, AbilityId) await self._execute( action=sc_pb.RequestAction( actions=[ sc_pb.Action( action_raw=raw_pb.ActionRaw( toggle_autocast=raw_pb.ActionRawToggleAutocast( ability_id=ability.value, # pyrefly: ignore unit_tags=(u.tag for u in units), ) ) ) ] ) )
[docs] async def debug_create_unit( self, unit_spawn_commands: list[tuple[UnitTypeId, int, Point2 | Point3, Literal[1, 2]]] ) -> None: """Usage example (will spawn 5 marines in the center of the map for player ID 1): await self._client.debug_create_unit([[UnitTypeId.MARINE, 5, self._game_info.map_center, 1]]) :param unit_spawn_commands:""" assert unit_spawn_commands, "List is empty" await self._execute( debug=sc_pb.RequestDebug( # pyrefly: ignore debug=( debug_pb.DebugCommand( create_unit=debug_pb.DebugCreateUnit( unit_type=unit_type.value, owner=owner_id, pos=position.as_Point2D, quantity=amount_of_units, ) ) for unit_type, amount_of_units, position, owner_id in unit_spawn_commands ) ) )
[docs] async def debug_kill_unit(self, unit_tags: Unit | Units | list[int] | set[int]) -> None: """ :param unit_tags: """ if isinstance(unit_tags, Units): unit_tags = unit_tags.tags if isinstance(unit_tags, Unit): unit_tags = [unit_tags.tag] assert unit_tags await self._execute( # pyrefly: ignore debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit(tag=unit_tags))]) )
[docs] async def move_camera(self, position: Unit | Units | Point2 | Point3) -> None: """Moves camera to the target position :param position:""" assert isinstance(position, (Unit, Units, Point2, Point3)) if isinstance(position, Units): position = position.center if isinstance(position, Unit): position = position.position await self._execute( action=sc_pb.RequestAction( actions=[ sc_pb.Action( action_raw=raw_pb.ActionRaw( camera_move=raw_pb.ActionRawCameraMove(center_world_space=position.to3.as_Point) ) ) ] ) )
[docs] async def obs_move_camera(self, position: Unit | Units | Point2 | Point3, distance: float = 0) -> None: """Moves observer camera to the target position. Only works when observing (e.g. watching the replay). :param position:""" assert isinstance(position, (Unit, Units, Point2, Point3)) if isinstance(position, Units): position = position.center if isinstance(position, Unit): position = position.position await self._execute( obs_action=sc_pb.RequestObserverAction( actions=[ sc_pb.ObserverAction( camera_move=sc_pb.ActionObserverCameraMove(world_pos=position.as_Point2D, distance=distance) ) ] ) )
[docs] async def move_camera_spatial(self, position: Point2 | Point3) -> None: """Moves camera to the target position using the spatial aciton interface :param position:""" assert isinstance(position, (Point2, Point3)) action = sc_pb.Action( action_render=spatial_pb.ActionSpatial( camera_move=spatial_pb.ActionSpatialCameraMove(center_minimap=position.as_PointI) ) ) await self._execute(action=sc_pb.RequestAction(actions=[action]))
[docs] def debug_text_simple(self, text: str) -> None: """Draws a text in the top left corner of the screen (up to a max of 6 messages fit there).""" self._debug_texts.append(DrawItemScreenText(text=text, color=None, start_point=Point2((0, 0)), font_size=8))
[docs] def debug_text_screen( self, text: str, pos: Point2 | Point3 | tuple[float, float] | list[float], color: tuple[float, float, float] | list[float] | Point3 | None = None, size: int = 8, ) -> None: """ Draws a text on the screen (monitor / game window) with coordinates 0 <= x, y <= 1. :param text: :param pos: :param color: :param size: """ assert len(pos) >= 2 assert 0 <= pos[0] <= 1 assert 0 <= pos[1] <= 1 pos = Point2((pos[0], pos[1])) self._debug_texts.append(DrawItemScreenText(text=text, color=color, start_point=pos, font_size=size))
def debug_text_2d( self, text: str, pos: Point2 | Point3 | tuple[float, float] | list[float], color: tuple[float, float, float] | list[float] | Point3 | None = None, size: int = 8, ): return self.debug_text_screen(text, pos, color, size)
[docs] def debug_text_world( self, text: str, pos: Unit | Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, size: int = 8, ) -> None: """ Draws a text at Point3 position in the game world. To grab a unit's 3d position, use unit.position3d Usually the Z value of a Point3 is between 8 and 14 (except for flying units). Use self.get_terrain_z_height() from bot_ai.py to get the Z value (height) of the terrain at a 2D position. :param text: :param color: :param size: """ if isinstance(pos, Unit): pos = pos.position3d assert isinstance(pos, Point3) self._debug_texts.append(DrawItemWorldText(text=text, color=color, start_point=pos, font_size=size))
def debug_text_3d( self, text: str, pos: Unit | Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, size: int = 8, ): return self.debug_text_world(text, pos, color, size)
[docs] def debug_line_out( self, p0: Unit | Point3, p1: Unit | Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: """ Draws a line from p0 to p1. :param p0: :param p1: :param color: """ if isinstance(p0, Unit): p0 = p0.position3d assert isinstance(p0, Point3) if isinstance(p1, Unit): p1 = p1.position3d assert isinstance(p1, Point3) self._debug_lines.append(DrawItemLine(color=color, start_point=p0, end_point=p1))
[docs] def debug_box_out( self, p_min: Unit | Point3, p_max: Unit | Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: """ Draws a box with p_min and p_max as corners of the box. :param p_min: :param p_max: :param color: """ if isinstance(p_min, Unit): p_min = p_min.position3d assert isinstance(p_min, Point3) if isinstance(p_max, Unit): p_max = p_max.position3d assert isinstance(p_max, Point3) self._debug_boxes.append(DrawItemBox(start_point=p_min, end_point=p_max, color=color))
[docs] def debug_box2_out( self, pos: Unit | Point3, half_vertex_length: float = 0.25, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: """ Draws a box center at a position 'pos', with box side lengths (vertices) of two times 'half_vertex_length'. :param pos: :param half_vertex_length: :param color: """ if isinstance(pos, Unit): pos = pos.position3d assert isinstance(pos, Point3) p0 = pos + Point3((-half_vertex_length, -half_vertex_length, -half_vertex_length)) p1 = pos + Point3((half_vertex_length, half_vertex_length, half_vertex_length)) self._debug_boxes.append(DrawItemBox(start_point=p0, end_point=p1, color=color))
[docs] def debug_sphere_out( self, p: Unit | Point3, r: float, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: """ Draws a sphere at point p with radius r. :param p: :param r: :param color: """ if isinstance(p, Unit): p = p.position3d assert isinstance(p, Point3) self._debug_spheres.append(DrawItemSphere(start_point=p, radius=r, color=color))
async def _send_debug(self) -> None: """Sends the debug draw execution. This is run by main.py now automatically, if there is any items in the list. You do not need to run this manually any longer. Check examples/terran/ramp_wall.py for example drawing. Each draw request needs to be sent again in every single on_step iteration. """ debug_hash = ( sum(hash(item) for item in self._debug_texts), sum(hash(item) for item in self._debug_lines), sum(hash(item) for item in self._debug_boxes), sum(hash(item) for item in self._debug_spheres), ) if debug_hash != (0, 0, 0, 0): if debug_hash != self._debug_hash_tuple_last_iteration: # Something has changed, either more or less is to be drawn, or a position of a drawing changed (e.g. when drawing on a moving unit) self._debug_hash_tuple_last_iteration = debug_hash try: await self._execute( debug=sc_pb.RequestDebug( debug=[ debug_pb.DebugCommand( draw=debug_pb.DebugDraw( # pyrefly: ignore text=[text.to_proto() for text in self._debug_texts] if self._debug_texts else None, # pyrefly: ignore lines=[line.to_proto() for line in self._debug_lines] if self._debug_lines else None, # pyrefly: ignore boxes=[box.to_proto() for box in self._debug_boxes] if self._debug_boxes else None, # pyrefly: ignore spheres=[sphere.to_proto() for sphere in self._debug_spheres] if self._debug_spheres else None, ) ) ] ) ) except ProtocolError: return self._debug_draw_last_frame = True self._debug_texts.clear() self._debug_lines.clear() self._debug_boxes.clear() self._debug_spheres.clear() elif self._debug_draw_last_frame: # Clear drawing if we drew last frame but nothing to draw this frame self._debug_hash_tuple_last_iteration = (0, 0, 0, 0) await self._execute( debug=sc_pb.RequestDebug( debug=[ # pyrefly: ignore debug_pb.DebugCommand(draw=debug_pb.DebugDraw(text=None, lines=None, boxes=None, spheres=None)) ] ) ) self._debug_draw_last_frame = False async def debug_leave(self) -> None: await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(end_game=debug_pb.DebugEndGame())]))
[docs] async def debug_set_unit_value( self, unit_tags: Iterable[int] | Units | Unit, unit_value: int, value: float ) -> None: """Sets a "unit value" (Energy, Life or Shields) of the given units to the given value. Can't set the life of a unit to 0, use "debug_kill_unit" for that. Also can't set the life above the unit's maximum. The following example sets the health of all your workers to 1: await self.debug_set_unit_value(self.workers, 2, value=1)""" if isinstance(unit_tags, Units): unit_tags = unit_tags.tags if isinstance(unit_tags, Unit): unit_tags = [unit_tags.tag] assert hasattr(unit_tags, "__iter__"), ( f"unit_tags argument needs to be an iterable (list, dict, set, Units), given argument is {type(unit_tags).__name__}" ) assert 1 <= unit_value <= 3, ( f"unit_value needs to be between 1 and 3 (1 for energy, 2 for life, 3 for shields), given argument is {unit_value}" ) assert all(tag > 0 for tag in unit_tags), f"Unit tags have invalid value: {unit_tags}" assert isinstance(value, (int, float)), "Value needs to be of type int or float" assert value >= 0, "Value can't be negative" await self._execute( debug=sc_pb.RequestDebug( debug=( debug_pb.DebugCommand( unit_value=debug_pb.DebugSetUnitValue( unit_value=unit_value, # pyrefly: ignore[bad-argument-type] value=float(value), unit_tag=unit_tag, ) ) for unit_tag in unit_tags ) ) )
[docs] async def debug_hang(self, delay_in_seconds: float) -> None: """Freezes the SC2 client. Not recommended to be used.""" delay_in_ms = int(round(delay_in_seconds * 1000)) await self._execute( debug=sc_pb.RequestDebug( debug=[ debug_pb.DebugCommand( test_process=debug_pb.DebugTestProcess( test=1, # pyrefly: ignore delay_ms=delay_in_ms, ) ) ] ) )
[docs] async def debug_show_map(self) -> None: """Reveals the whole map for the bot. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=1)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_control_enemy(self) -> None: """Allows control over enemy units and structures similar to team games control - does not allow the bot to spend the opponent's ressources. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=2)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_food(self) -> None: """Should disable food usage (does not seem to work?). Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=3)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_free(self) -> None: """Units, structures and upgrades are free of mineral and gas cost. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=4)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_all_resources(self) -> None: """Gives 5000 minerals and 5000 vespene to the bot.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=5)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_god(self) -> None: """Your units and structures no longer take any damage. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=6)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_minerals(self) -> None: """Gives 5000 minerals to the bot.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=7)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_gas(self) -> None: """Gives 5000 vespene to the bot. This does not seem to be working.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=8)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_cooldown(self) -> None: """Disables cooldowns of unit abilities for the bot. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=9)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_tech_tree(self) -> None: """Removes all tech requirements (e.g. can build a factory without having a barracks). Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=10)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_upgrade(self) -> None: """Researches all currently available upgrades. E.g. using it once unlocks combat shield, stimpack and 1-1. Using it a second time unlocks 2-2 and all other upgrades stay researched.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=11)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def debug_fast_build(self) -> None: """Sets the build time of units and structures and upgrades to zero. Using it a second time disables it again.""" await self._execute( debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=12)]) # pyrefly: ignore[bad-argument-type] )
[docs] async def quick_save(self) -> None: """Saves the current game state to an in-memory bookmark. See: https://github.com/Blizzard/s2client-proto/blob/eeaf5efaea2259d7b70247211dff98da0a2685a2/s2clientprotocol/sc2api.proto#L93""" await self._execute(quick_save=sc_pb.RequestQuickSave())
[docs] async def quick_load(self) -> None: """Loads the game state from the previously stored in-memory bookmark. Caution: - The SC2 Client will crash if the game wasn't quicksaved - The bot step iteration counter will not reset - self.state.game_loop will be set to zero after the quickload, and self.time is dependant on it""" await self._execute(quick_load=sc_pb.RequestQuickLoad())
class DrawItem: @staticmethod def to_debug_color(color: tuple[float, float, float] | list[float] | Point3 | None = None) -> debug_pb.Color: """Helper function for color conversion""" if color is None: return debug_pb.Color(r=255, g=255, b=255) # Need to check if not of type Point3 because Point3 inherits from tuple if isinstance(color, (tuple, list)) or isinstance(color, Point3) and len(color) == 3: return debug_pb.Color(r=int(color[0]), g=int(color[1]), b=int(color[2])) # In case color is of type Point3 r = getattr(color, "r", getattr(color, "x", 255)) g = getattr(color, "g", getattr(color, "y", 255)) b = getattr(color, "b", getattr(color, "z", 255)) if max(r, g, b) <= 1: r *= 255 g *= 255 b *= 255 return debug_pb.Color(r=int(r), g=int(g), b=int(b)) class DrawItemScreenText(DrawItem): def __init__( self, start_point: Point2, color: tuple[float, float, float] | list[float] | Point3 | None = None, text: str = "", font_size: int = 8, ) -> None: self._start_point = start_point self._color = color self._text = text self._font_size = font_size def to_proto(self): return debug_pb.DebugText( color=self.to_debug_color(self._color), text=self._text, virtual_pos=self._start_point.to3.as_Point, # pyrefly: ignore world_pos=None, size=self._font_size, ) def __hash__(self) -> int: return hash((self._start_point, self._color, self._text, self._font_size)) class DrawItemWorldText(DrawItem): def __init__( self, start_point: Point3, color: tuple[float, float, float] | list[float] | Point3 | None, text: str = "", font_size: int = 8, ) -> None: self._start_point = start_point self._color = color self._text = text self._font_size = font_size def to_proto(self): return debug_pb.DebugText( color=self.to_debug_color(self._color), text=self._text, # pyrefly: ignore virtual_pos=None, world_pos=self._start_point.as_Point, size=self._font_size, ) def __hash__(self) -> int: return hash((self._start_point, self._text, self._font_size, self._color)) class DrawItemLine(DrawItem): def __init__( self, start_point: Point3, end_point: Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: self._start_point = start_point self._end_point = end_point self._color = color def to_proto(self): return debug_pb.DebugLine( line=debug_pb.Line(p0=self._start_point.as_Point, p1=self._end_point.as_Point), color=self.to_debug_color(self._color), ) def __hash__(self) -> int: return hash((self._start_point, self._end_point, self._color)) class DrawItemBox(DrawItem): def __init__( self, start_point: Point3, end_point: Point3, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: self._start_point = start_point self._end_point = end_point self._color = color def to_proto(self): return debug_pb.DebugBox( min=self._start_point.as_Point, max=self._end_point.as_Point, color=self.to_debug_color(self._color), ) def __hash__(self) -> int: return hash((self._start_point, self._end_point, self._color)) class DrawItemSphere(DrawItem): def __init__( self, start_point: Point3, radius: float, color: tuple[float, float, float] | list[float] | Point3 | None = None, ) -> None: self._start_point = start_point self._radius = radius self._color = color def to_proto(self): return debug_pb.DebugSphere( p=self._start_point.as_Point, r=self._radius, color=self.to_debug_color(self._color) ) def __hash__(self) -> int: return hash((self._start_point, self._radius, self._color))