Source code for main

import time
import logging
import copy
import os
import sys
import pandas as pd
from datetime import datetime, timezone, timedelta
# GridAPPS-D
from sim import Sim
import agents as AG
from cimgraph.data_profile import CIM_PROFILE
import gridappsd.field_interface.agents.agents as agents_mod

from gridappsd.difference_builder import DifferenceBuilder
from gridappsd.field_interface.interfaces import MessageBusDefinition
import gridappsd.topics as t


from gridappsd.field_interface.context import LocalContext
# Helper functions
import helper as HP
import area_helper_functions as PSA


#### Initialize log and file location
log = logging.getLogger(__name__)
file_location = os.path.dirname(os.path.abspath(__file__))

#### Required variables
cim_profile = CIM_PROFILE.RC4_2021.value
agents_mod.set_cim_profile(cim_profile, iec61970_301=7)

#### constant variables
[docs] class InitializationHardCodedTimeSleep: """A hard-coded configuration class with number variables. Provide the sleep times needed to properly initialize the simulation Attributes: interval_1 (float): Time interval in seconds. Await simulation to start. interval_2 (float): Time interval in seconds. Internal loop to pause the simulation. interval_3 (float): Time interval in seconds. Await first pause stop to be concluded. interval_4 (float): Time interval in seconds. Running second at a time and waiting to get second precision simulation time. interval_5 (float): Time interval in seconds. Await CB changes to take full effect. interval_6 (float): Time interval in seconds. Internal loop for initializing the agents. interval_7 (float): Time interval in seconds. Time wait before emulating the fault event. interval_8 (float): Time interval in seconds. Needed time for the emulating fault event to take effect. internal_9 (float): Time interval in seconds. Internal loop verify if the simulation is done. """ interval_1 = 15.0 # time.sleep(15) await simulation to start interval_2 = 0.1 # time.sleep(0.1) internal loop pausing the simulation interval_3 = 30.0 # time.sleep(30) await first pause stop to be concluded interval_4 = 3.0 # time.sleep(3) running second at a time and waiting to get second precision simulation time interval_5 = 30.0 # time.sleep(10) await CB changes to take full effect interval_6 = 15.0 # time.sleep(10) internal loop for initializing the agents interval_7 = 3.0 # time await before emulating the fault event interval_8 = 1.0 # needed time for the emulating fault event to take effect interval_9 = 0.2 # Internal loop verify if the simulation is done.
#### Functions
[docs] def main(configuration: dict) -> None: """Main entry point for the simulation. This script initializes the simulation environment, loads required data, starts the simulation, populate the agents, and once the simulation is done the outputs are written to file. The results are saved to the specified output directory as configure. The argument configuration contains all the required information. This script can be run from the command line the **if __name__ == "__main__":** portion contain the file being loaded with the configuration. Example: Run the simulation with example settings: $ python /src/main.py Args: configuration (dict): Contains all the required information. Please refer to the larger documentation for details. """ global feeder_agent global area_agents #### additional data read ybus=PSA.additional_area_data(configuration) #### begin simulation log.debug("make and start simulation") sim = Sim(configuration['test_case']) time.sleep(InitializationHardCodedTimeSleep.interval_1) for i in range(100): sim.simulation.pause() time.sleep(InitializationHardCodedTimeSleep.interval_2) #### populate agents #### populate feeder agent system_bus = HP.overwrite_parameters(sim.get_feeder_id()) feeder_bus = system_bus log.debug("Making AG.SampleFeederAgent.") feeder_agent = AG.SampleFeederAgent(feeder_bus, system_bus, configuration['app_config'], None, sim.get_simulation_id(),sim,configuration) #### having time information in feeder agent log.debug(f"Providing feeder the simulation time") sim.simulation.resume_pause_at(60) time.sleep(InitializationHardCodedTimeSleep.interval_3) for _ in range(65): feeder_agent.time_control() time.sleep(InitializationHardCodedTimeSleep.interval_4) log.debug("Making AG.SampleSwitchAreaAgent.") area_agents=list() switch_areas = feeder_agent.agent_area_dict['switch_areas'] #### CB initial condition log.debug(f"Setting CB to initial condition.") cb=configuration['feeder_agent_configuration']['initial_CB_state'].split("_")[-1] temp=feeder_agent.system_status feeder_agent.system_status = feeder_agent.system_status.replace("1", "5").replace("0", "5") feeder_agent.update_cb_status(cb) feeder_agent.system_status = temp time.sleep(InitializationHardCodedTimeSleep.interval_5) #### populate switch agents for sw_idx, switch_area in enumerate(switch_areas): name="" for key, value in configuration["feeder_agent_configuration"]["switch_area_names"].items(): if set(switch_area["boundary_switches"]) == set(value): name = key configuration['area_agent_configuration']['ybus']=ybus[name] break switch_bus = HP.overwrite_parameters(sim.get_feeder_id(), f"{sw_idx}") # additional exclusive area information (i.e., ybus) area_agents.append( AG.SampleSwitchAreaAgent(feeder_bus, switch_bus, configuration['app_config'], switch_area, sim.get_simulation_id(), name=name, agent_config=copy.deepcopy(configuration)) ) log.debug(f"Created agent: {sw_idx}") #### initializing the agents for _ in range(65): feeder_agent.time_control() time.sleep(InitializationHardCodedTimeSleep.interval_6) #### start feeder agent time control log.debug(f"Starting feeder at time control") feeder_agent._perform_self_time_control_call=True # feeder_agent._perform_self_time_control_call=False feeder_agent.time_control() #### Emulate fault time.sleep(InitializationHardCodedTimeSleep.interval_7) log.debug(f"Emulate fault") for i in configuration['feeder_agent_configuration']['forced_event']['breaker_status_change']: feeder_agent.open_CB(i) time.sleep(InitializationHardCodedTimeSleep.interval_8) # needed time to all commands to take effect log.debug(f"SImulation is at fault.") #### awaiting simulation to end while not sim.done: try: time.sleep(InitializationHardCodedTimeSleep.interval_9) except (Exception, KeyboardInterrupt) as e: log.info(f"Simulation stop due to: {e}") break feeder_agent.monitoring_to_file() # save monitor data to file
if __name__ == "__main__": # read TOML os.path.expanduser('~/grid/dltdflisr/config/input_config.toml') configuration = HP.load_toml(f'{file_location}/../config/input_config.toml') # configure logging HP.configure_logging(**configuration['logging_configuration']) # create sport environmental variables HP.os_variables(configuration['system_variables']) #### run main main(configuration)