import pandas as pd
import uuid
from datetime import datetime, timedelta
import logging
log = logging.getLogger(__name__)
[docs]
class DataTrekker:
"""A class used to process and store sensor data from simulations.
Differently than the class DataProcessor. This class store data is utilized for decision making. Furthermore, this class
does not write any information to file. The simulation of the distributed power grid sensors is converted to the sensors
according to the input sensor information with specific variances (i.e., precision). The data can be called to be utilized in
evaluating the switch delimited area during the simulation.
Attributes:
meta_data (pd.DataFrame): The metadata for the simulation.
sensors (pd.DataFrame): The sensor data for the simulation.
max_rows (int): The maximum number of rows to store before removing old data.
columns (list): The columns of the data being processed.
sensor_data (pd.DataFrame): A DataFrame to temporarily store sensor data.
"""
def __init__(self,
meta_data:pd.DataFrame,
sensors:pd.DataFrame,
max_rows:int=200000) -> None:
"""Initializes the DataTrekker with the given parameters.
Args:
meta_data (pd.DataFrame): The metadata for the simulation.
sensors (pd.DataFrame): The sensor data for the simulation.
max_rows (int, optional): The maximum number of rows to store before removing old data. Defaults to 200000.
"""
self.process_metadata(meta_data.copy())
self.process_sensors(sensors.copy())
self.max_rows=max_rows
self.columns=['GUID', 'timestamp', 'measurement']
self.sensor_data = []
[docs]
def process_sensors(self,sensors: pd.DataFrame) -> None:
"""Processes and updates the sensor data.
Args:
sensors (pd.DataFrame): The sensor data to be processed.
"""
sensors['GUID'] = [str(uuid.uuid4()) for _ in range(len(sensors))]
sensors['node1']=sensors['node1'].str.lower()
sensors['node2']=sensors['node2'].str.lower()
self.sensors=sensors
[docs]
def add_receive_data(self,new_data:pd.DataFrame) -> None:
"""Adds new sensor data to the existing data storage.
Args:
new_data (pd.DataFrame): The new data to be added.
"""
if len(self.sensor_data) > 0:
self.sensor_data = pd.concat([self.sensor_data, new_data], ignore_index=True)
else:
self.sensor_data = new_data.copy(deep=True)
[docs]
def receive_data(self, unique_id: str, timestamp: pd.Timestamp, value: complex) -> None:
"""Receives and processes incoming sensor data.
Args:
unique_id (str): The unique identifier for the data entry.
timestamp (pd.Timestamp): The timestamp of the data entry.
value (complex): The value of the data entry.
"""
sensors=self.sensors
meta_data=self.meta_data
meta={**meta_data.loc[meta_data['measurement_mRID']==(unique_id),['f_node_name','t_node_name','measurement_type']].to_dict(orient='records')[0]}
try:
if meta['measurement_type'] == 'VA':
unique_id=sensors.loc[(sensors['sensor_type']=='Pi')&(sensors['node1']==meta['f_node_name']),'GUID'].item()
self.add_receive_data(pd.DataFrame([[unique_id, timestamp, value.real]], columns=self.columns))
unique_id=sensors.loc[(sensors['sensor_type']=='Qi')&(sensors['node1']==meta['f_node_name']),'GUID'].item()
self.add_receive_data(pd.DataFrame([[unique_id, timestamp, value.imag]], columns=self.columns))
elif meta['measurement_type'] == 'PNV':
unique_id=sensors.loc[(sensors['sensor_type']=='Vi')&(sensors['node1']==meta['f_node_name']),'GUID'].item()
self.add_receive_data(pd.DataFrame([[unique_id, timestamp, abs(value)]], columns=self.columns))
except Exception as e:
log.debug(f"meta: {meta},unique_id: {unique_id},timestamp: {timestamp},value: {value}, exception: {e}.")
[docs]
def get_most_recent_data(self) -> pd.DataFrame:
"""Retrieves the most recent sensor data.
Returns:
pd.DataFrame: A DataFrame containing the most recent data.
"""
sensor_data = self.sensor_data
sensors=self.sensors.copy()
sensors.drop(columns='measurement',inplace=True)
latest_indices = sensor_data.groupby('GUID')['timestamp'].idxmax()
latest_info_df = sensor_data.loc[latest_indices].copy()
latest_info_df.drop(columns='timestamp',inplace=True)
data=sensors.merge(latest_info_df,on='GUID',how='inner')
self.organize_data()
self.remove_old_data()
return data.drop(columns='GUID')
[docs]
def organize_data(self) -> None:
"""
Organizes the sensor data by sorting and removing duplicates.
"""
self.sensor_data = self.sensor_data.sort_values(by='timestamp', ascending=False)
self.sensor_data=self.sensor_data.drop_duplicates()
self.sensor_data.reset_index(drop=True,inplace=True)
[docs]
def remove_old_data(self) -> None:
"""
Removes old sensor data exceeding the maximum row limit.
"""
self.sensor_data = self.sensor_data.iloc[-self.max_rows:]
if __name__ == "__main__":
import os
# Example usage
base_path = "~/grid/dltdflisr"
base_path = os.path.expanduser(base_path)
sensors=f"{base_path}/config/sensors.h5"
connections=f"{base_path}/config/area_connections.h5"
max_area_load=f"{base_path}/config/max_node_load.h5"
meta=f"{base_path}/outputs/record_00.h5"
measurement=f"{base_path}/outputs/switch_4.json"
sensors=pd.read_hdf(sensors)
meta_data=pd.read_hdf(meta,key='/meta_data')
# make object
processor=DataTrekker(meta_data,sensors)
# Receiving some data
processor.receive_data('_924dfad1-f264-4835-8c10-58782ab8e24e', datetime(2023, 6, 24, 12, 0), 10)
processor.receive_data('_924dfad1-f264-4835-8c10-58782ab8e24e', datetime(2023, 6, 24, 12, 5), 15)
processor.receive_data('_cadea118-f954-4402-ab81-e6db36e836fe', datetime(2023, 6, 24, 12, 10), 8)
processor.receive_data('_cadea118-f954-4402-ab81-e6db36e836fe', datetime(2023, 6, 24, 12, 0), 20)
processor.receive_data('_cadea118-f954-4402-ab81-e6db36e836fe', datetime(2023, 6, 24, 12, 5), 25)
# get latest data
print(processor.get_most_recent_data())