[python] Anvil Background Code
Viewer
*** This page was generated with the meta tag "noindex, nofollow". This happened because you selected this option before saving or the system detected it as spam. This means that this page will never get into the search engines and the search bot will not crawl it. There is nothing to worry about, you can still share it with anyone.
- # set the threshold for consecutive overuse
- self.overuse_counter = 0 # initialize consecutive overuse
- def input_current_consumption(self, hour):
- consumption = float(input(f"Enter current consumption for {self.name}: "))
- self.current.append(consumption)
- def update_differences(self):
- now = datetime.datetime.now()
- day_of_week = now.weekday() # Monday is 0 and Sunday is 6
- hour_of_day = now.hour
- # Ensure lists are of adequate size for today
- required_size = hour_of_day + 1
- self.ensure_list_size(self.sum_baseline, required_size)
- self.ensure_list_size(self.sum_current, required_size)
- self.ensure_list_size(self.difference, required_size)
- if day_of_week < len(self.current):
- cumulative_current = 0
- cumulative_baseline = 0
- for hour in range(hour_of_day + 1):
- current_consumption = self.current[day_of_week][hour] if hour < len(self.current[day_of_week]) else 0
- baseline_consumption = self.baseline[day_of_week][hour] if hour < len(self.baseline[day_of_week]) else 0
- cumulative_current += current_consumption
- cumulative_baseline += baseline_consumption
- # Update the sums and differences for each hour up to the current hour
- self.sum_current[hour] = cumulative_current
- self.sum_baseline[hour] = cumulative_baseline
- self.difference[hour] = current_consumption - baseline_consumption
- else:
- print(f"No current consumption data available for {self.name} on day {day_of_week} up to hour {hour_of_day}")
- def calculate_hourly_consumption(self):
- # Choose which sum to process, for example, self.sum_baseline
- cumulative_sum = self.sum_baseline # Change to self.sum_current if needed
- if not cumulative_sum:
- print(f"No data available for {self.name}")
- return
- self.graph = []
- self.graph.append(cumulative_sum[0])
- for i in range(1, len(cumulative_sum)):
- hourly_consumption = cumulative_sum[i] - cumulative_sum[i - 1]
- self.graph.append(hourly_consumption)
- print(f"Hourly consumption for {self.name}: {self.graph}")
- return self.graph
- def update_graph_with_percentage_differences(self):
- # Calculate hourly values from cumulative sums
- hourly_baseline = [self.sum_baseline[0]] + [self.sum_baseline[i] - self.sum_baseline[i - 1] for i in
- range(1, len(self.sum_baseline))]
- hourly_current = [self.sum_current[0]] + [self.sum_current[i] - self.sum_current[i - 1] for i in
- range(1, len(self.sum_current))]
- # Calculate percentage differences and store in self.graph
- self.graph = [(c - b) / b * 100 for c, b in zip(hourly_current, hourly_baseline) if b != 0]
- print(f"Hourly Percentage Difference for {self.name}: {self.graph}")
- def update_graph_difference(self):
- self.graph = self.difference
- print(f"Difference for baseline of {self.name}: {self.graph}")
- def update_graph_current_consumption(self):
- self.graph = self.sum_current
- print(f"Sum of the current consumption of {self.name}: {self.graph}")
- def calculate_cumulative_percentage_difference(self):
- if not self.sum_current or not self.sum_baseline:
- print(f"No data available for {self.name} to calculate cumulative percentage differences.")
- return
- self.graph = []
- for current, baseline in zip(self.sum_current, self.sum_baseline):
- if baseline != 0: # Avoid division by zero
- percentage_difference = (current - baseline) / baseline * 100
- else:
- percentage_difference = 0 # Handle the case where baseline is 0
- self.graph.append(percentage_difference)
- print(f"Cumulative percentage differences for {self.name}: {self.graph}")
- return self.graph
- def ensure_list_size(self, list_to_check, required_size):
- while len(list_to_check) < required_size:
- list_to_check.append(0)
- def print_differences(self):
- # Ensure differences are updated before printing
- self.update_differences()
- print(f"Differences for {self.name}:")
- print(f"Current Sum {self.sum_current}:")
- print(f"Baseline Sum {self.sum_baseline}:")
- def compare_consumption(self):
- current_time = datetime.datetime.now()
- day_of_week = current_time.weekday() # 0 is Monday, 6 is Sunday
- hour_of_day = current_time.hour
- # Round up the time to the next hour if needed
- if current_time.minute >= 30:
- hour_of_day = (hour_of_day + 1) % 24
- if day_of_week < len(self.current) and hour_of_day < len(self.current[day_of_week]):
- current_consumption = self.current[day_of_week][hour_of_day]
- baseline_consumption = self.baseline[day_of_week][hour_of_day]
- if baseline_consumption < 1 and current_consumption > 2 or current_consumption > 1.5 * baseline_consumption:
- self.overuse_counter += 1
- if self.overuse_counter >= self.overuse_threshold:
- print(
- f"Alert: {self.name} is using too much power at hour {hour_of_day} for {self.overuse_threshold} consecutive times.")
- else:
- # Print warning without triggering the alert counter
- print(f"Warning: {self.name} is using too much power at hour {hour_of_day}.")
- else:
- self.overuse_counter = 0 # Reset counter if the condition doesn't meet
- def compare_consumption_manual(self, day_of_week, hour):
- if day_of_week < len(self.current) and hour < len(self.current[day_of_week]):
- current_consumption = self.current[day_of_week][hour]
- baseline_consumption = self.baseline[day_of_week][hour]
- # Logic to check overuse based on the threshold
- if baseline_consumption < 1 and current_consumption > 2 or current_consumption > 1.5 * baseline_consumption:
- self.overuse_counter += 1
- if self.overuse_counter >= self.overuse_threshold: # Use the dynamic threshold
- print(
- f"Alert: {self.name} is using too much power at hour {hour} for {self.overuse_threshold} consecutive times.")
- else:
- self.overuse_counter = 0 # Reset counter if the condition doesn't meet
- #@staticmethod
- def calculate_percentage_difference(current_sum, baseline_sum):
- return (current_sum - baseline_sum) / baseline_sum * 100
- daily_winners = []
- current_pairings = []
- """def randomize_pairings():
- shuffled_labs = random.sample(labs, len(labs)) # Randomly shuffle labs
- return [(shuffled_labs[i], shuffled_labs[i+1]) for i in range(0, len(shuffled_labs), 2)]"""
- # Global variable to keep track of which lab to double
- current_double_index = 0
- def randomize_pairings():
- global current_double_index
- labs_with_double = labs[:] # Create a copy of the labs list
- # Double up the current lab
- labs_with_double.append(labs[current_double_index])
- # Shuffle the modified list
- shuffled_labs = random.sample(labs_with_double, len(labs_with_double))
- # Create pairings
- pairings = [(shuffled_labs[i], shuffled_labs[i + 1]) for i in range(0, len(shuffled_labs) - 1, 2)]
- # Move to the next lab for doubling in the last run (this works I think)
- current_double_index = (current_double_index + 1) % len(labs)
- return pairings
- def finalize_daily_results():
- # Keep only the last len(labs)/2 winners, as each lab participates in one battle per cycle.
- recent_winners = daily_winners[-(len(labs) // 2):]
- print("Final results for the day:")
- for winner in recent_winners:
- print(f"{winner} won the battle today.")
- # Optionally reset daily_winners if you're starting a new day of battles.
- daily_winners.clear()
- last_check_day = datetime.datetime.now().day
- def input_current_consumption(self, sensor_gid, vue):
- devices = vue.get_devices()
- channel = None
- # Find the channel that matches the sensor_gid (PyEmVue API Use)
- for device in devices:
- if device.device_gid == sensor_gid:
- channel = device.channels[0]
- break
- if channel:
- end_time = datetime.datetime.now(datetime.timezone.utc)
- start_time = end_time - datetime.timedelta(minutes=1)
- usage_over_time, start_time = vue.get_chart_usage(channel, start_time, end_time, scale=Scale.MINUTE.value,
- unit=Unit.KWH.value)
- if usage_over_time:
- total_usage_1_minute = (sum(usage_over_time) / 2) * 1000 # kWh to Watts conversion
- self.current.append(total_usage_1_minute)
- else:
- print(f"No data available for sensor GID: {sensor_gid}")
- else:
- print(f"Device with GID {sensor_gid} not found")
- def arrange_function(rankings):
- for i in range(len(rankings)):
- for j in range(i + 1, len(rankings)):
- if (rankings[i][1]) > (rankings[j][1]):
- # Swap
- rankings[i], rankings[j] = rankings[j], rankings[i]
- def initialize_current_consumption_with_baseline(lab_instance, baseline_data):
- lab_instance.current = copy.deepcopy(baseline_data)
- def update_lab_current_consumption(lab, total_consumption):
- current_time = datetime.datetime.now()
- day_of_week = current_time.weekday() # 0 is Monday, 6 is Sunday
- while len(lab.current) <= day_of_week:
- lab.current.append([0] * 24) # Initialize with zeroes for each hour
- hour_of_day = current_time.hour # 0 to 23
- lab.current[day_of_week][hour_of_day] = total_consumption
- def sum_consumption_and_baseline(labs):
- # Get the current time details
- now = datetime.datetime.now()
- day_of_week = now.weekday() # Monday is 0 and Sunday is 6
- hour_of_day = now.hour
- # Sum initial
- total_current_consumption = 0
- total_baseline_consumption = 0
- # Iterate
- for lab in labs:
- # Sum the consumption from the start of the week to the current hour today
- for day in range(day_of_week + 1): # current day included
- if day < day_of_week:
- total_current_consumption += sum(lab.current[day])
- total_baseline_consumption += sum(lab.baseline[day])
- else:
- total_current_consumption += sum(lab.current[day][:hour_of_day + 1])
- total_baseline_consumption += sum(lab.baseline[day][:hour_of_day + 1])
- # Calcul dif
- difference = total_current_consumption - total_baseline_consumption
- print(f"Total current consumption from all labs: {total_current_consumption}")
- print(f"Total baseline consumption from all labs: {total_baseline_consumption}")
- print(f"Difference between total consumption and baseline: {difference}")
- return total_current_consumption, total_baseline_consumption, difference
- """def battle(lab1, lab2, rankings):
- current_time = datetime.datetime.now()
- day_of_week = current_time.weekday() # 0 is Monday, 6 is Sunday
- hour_of_day = current_time.hour +1
- # Round up the time to the next hour for summation
- if current_time.minute >= 30:
- hour_of_day += 1
- print(hour_of_day)
- # Ensure hour_of_day does not exceed 23 (max index for hourly data)
- hour_of_day = min(hour_of_day, 23)
- # Sum up to the current (or next, if rounded up) hour
- sum_current_1 = sum(lab1.current[day_of_week][:hour_of_day])
- sum_baseline_1 = sum(lab1.baseline[day_of_week][:hour_of_day])
- print(sum_current_1)
- print(sum_baseline_1)
- sum_current_2 = sum(lab2.current[day_of_week][:hour_of_day])
- sum_baseline_2 = sum(lab2.baseline[day_of_week][:hour_of_day])
- percent_diff_1 = Lab.calculate_percentage_difference(sum_current_1, sum_baseline_1)
- percent_diff_2 = Lab.calculate_percentage_difference(sum_current_2, sum_baseline_2)
- rankings.append([lab1.name, percent_diff_1])
- rankings.append([lab2.name, percent_diff_2])
- winner = lab1.name if percent_diff_1 < percent_diff_2 else lab2.name
- daily_winners.append(winner) # Append the name of the winner to the daily_winners list
- print(f"The winner is {winner}.")
- print(f"Difference between {lab1.name} and baseline: {percent_diff_1:.2f}%")
- print(f"Difference between {lab2.name} and baseline: {percent_diff_2:.2f}%")"""
- def battle(lab1, lab2, rankings):
- current_time = datetime.datetime.now()
- day_of_week = current_time.weekday() # 0 is Monday, 6 is Sunday
- hour_of_day = current_time.hour + 1
- # Round up the time to the next hour for summation
- if current_time.minute >= 30:
- hour_of_day += 1
- print(hour_of_day)
- # Ensure hour_of_day does not exceed 23 (max index for hourly data)
- hour_of_day = min(hour_of_day, 23)
- # Sum up to the current (or next, if rounded up) hour
- sum_current_1 = sum(lab1.current[day_of_week][:hour_of_day])
- sum_baseline_1 = sum(lab1.baseline[day_of_week][:hour_of_day])
- sum_current_2 = sum(lab2.current[day_of_week][:hour_of_day])
- sum_baseline_2 = sum(lab2.baseline[day_of_week][:hour_of_day])
- percent_diff_1 = Lab.calculate_percentage_difference(sum_current_1, sum_baseline_1)
- percent_diff_2 = Lab.calculate_percentage_difference(sum_current_2, sum_baseline_2)
- # Update rankings with checking for duplicates
- update_ranking(rankings, lab1.name, percent_diff_1)
- update_ranking(rankings, lab2.name, percent_diff_2)
- winner = lab1.name if percent_diff_1 < percent_diff_2 else lab2.name
- daily_winners.append(winner)
- print(f"The winner is {winner}.")
- print(f"Difference between {lab1.name} and baseline: {percent_diff_1:.2f}%")
- print(f"Difference between {lab2.name} and baseline: {percent_diff_2:.2f}%")
- def update_ranking(rankings, lab_name, percent_diff):
- for ranking in rankings:
- if ranking[0] == lab_name:
- ranking[1] = percent_diff
- break
- else:
- rankings.append([lab_name, percent_diff])
- def save_lab_current_to_file(labs, folder_path):
- # Ensure the folder exists
- if not os.path.exists(folder_path):
- os.makedirs(folder_path)
- file_path = os.path.join(folder_path, "lab_consumption_data.txt")
- with open(file_path, "a") as file:
- # date
- file.write(f"Update Time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
- for lab in labs:
- # name
- file.write(f"{lab.name} current consumption: {lab.current}\n")
- file.write("\n")
- """def sum_and_compare_each_lab(labs):
- now = datetime.datetime.now()
- day_of_week = now.weekday() # Monday is 0 and Sunday is 6
- hour_of_day = now.hour
- # Iterate through each lab
- for lab in labs:
- # Initialize the sums for this lab
- lab_current_consumption = 0
- lab_baseline_consumption = 0
- # Sum the consumption from the start of the week to the current hour today for this lab
- for day in range(day_of_week + 1): # Include today
- if day < day_of_week: # Full day
- lab_current_consumption += sum(lab.current[day])
- lab_baseline_consumption += sum(lab.baseline[day])
- else: # Today, sum up to the current hour
- lab_current_consumption += sum(lab.current[day][:hour_of_day + 1])
- lab_baseline_consumption += sum(lab.baseline[day][:hour_of_day + 1])
- # Calculate the difference and percentage difference
- difference = lab_current_consumption - lab_baseline_consumption
- percent_difference = (difference / lab_baseline_consumption * 100) if lab_baseline_consumption != 0 else float(
- 'inf')
- # Print the results for this lab
- print(f"Lab {lab.name}:")
- print(f" Current consumption: {lab_current_consumption}")
- print(f" Baseline consumption: {lab_baseline_consumption}")
- print(f" Difference: {difference}")
- print(f" Percentage difference: {percent_difference:.2f}%\n")"""
- def sum_and_compare_each_lab(labs):
- # Iterate through each lab
- for lab in labs:
- if lab.sum_current and lab.sum_baseline:
- # Fetch the last updated values for current and baseline consumption
- lab_current_consumption = lab.sum_current[-1]
- lab_baseline_consumption = lab.sum_baseline[-1]
- difference = lab_current_consumption - lab_baseline_consumption
- percent_difference = (difference / lab_baseline_consumption * 100) if lab_baseline_consumption != 0 else float('inf')
- # Print the results for this lab
- print(f"Lab {lab.name}:")
- print(f" Current consumption up to now: {lab_current_consumption}")
- print(f" Baseline consumption up to now: {lab_baseline_consumption}")
- print(f" Difference: {difference}")
- print(f" Percentage difference: {percent_difference:.2f}%\n")
- else:
- print(f"Data for {lab.name} is incomplete or not updated.")
- """def get_total_lab_consumption(vue, lab, time_delta_minutes):
- while True:
- try:
- total_consumption = 0
- end_time = datetime.datetime.now(datetime.timezone.utc)
- start_time = end_time - datetime.timedelta(minutes=time_delta_minutes)
- # Attempt to get devices, this might fail if the WiFi is down
- devices = vue.get_devices()
- for gid, channel_num in zip(lab.gid, lab.channel):
- for device in devices:
- if device.device_gid == gid:
- for channel in device.channels:
- if str(channel.channel_num) == str(channel_num):
- channel_usage, _ = vue.get_chart_usage(
- channel,
- start_time,
- end_time,
- scale=Scale.MINUTE.value,
- unit=Unit.KWH.value
- )
- if channel_usage:
- total_consumption += sum(channel_usage)
- break
- return total_consumption
- except (ConnectionError, requests.exceptions.RequestException) as e:
- print(f"Connection error: {e}. Retrying in 5 seconds...")
- time.sleep(5)"""
- def get_total_lab_consumption(vue, lab, time_delta_minutes):
- while True:
- try:
- # Reset category consumptions for each call
- lab.AC = 0
- lab.Lighting = 0
- lab.CO = 0
- end_time = datetime.datetime.now(datetime.timezone.utc)
- start_time = end_time - datetime.timedelta(minutes=time_delta_minutes)
- devices = vue.get_devices()
- #consumption split!!
- for index, (gid, channel_num) in enumerate(zip(lab.gid, lab.channel)):
- for device in devices:
- if device.device_gid == gid:
- for channel in device.channels:
- if str(channel.channel_num) == str(channel_num):
- channel_usage, _ = vue.get_chart_usage(
- channel,
- start_time,
- end_time,
- scale=Scale.MINUTE.value,
- unit=Unit.KWH.value
- )
- if channel_usage:
- consumption_amount = sum(channel_usage)
- # Categorize the consumption based on the consumption type
- if lab.consumption_type[index] == 1: # AC
- lab.AC += consumption_amount
- elif lab.consumption_type[index] == 2: # Lighting
- lab.Lighting += consumption_amount
- elif lab.consumption_type[index] == 3: # CO
- lab.CO += consumption_amount
- break
- # Calculate total consumption
- total_consumption = lab.AC + lab.Lighting + lab.CO
- return total_consumption
- except (ConnectionError, requests.exceptions.RequestException) as e:
- print(f"Connection error: {e}. Retrying in 5 seconds...")
- time.sleep(5)
- def print_lab_consumption_percentages(lab):
- total_consumption = lab.AC + lab.Lighting + lab.CO
- if total_consumption > 0:
- ac_percentage = (lab.AC / total_consumption) * 100
- lighting_percentage = (lab.Lighting / total_consumption) * 100
- co_percentage = (lab.CO / total_consumption) * 100
- print(f"{lab.name} Consumption Percentages:")
- print(f"AC: {ac_percentage:.2f}%")
- print(f"Lighting: {lighting_percentage:.2f}%")
- print(f"CO: {co_percentage:.2f}%")
- else:
- print(f"No consumption data available for {lab.name}.")
- def calculate_all_labs_consumption(vue, labs, time_delta_minutes):
- consumption_dict = {}
- for lab in labs:
- consumption = get_total_lab_consumption(vue, lab, time_delta_minutes)
- consumption_dict[lab.name] = consumption
- return consumption_dict
- #AC = 1 Lighting = 2 CO = 3
- lab_a = Lab("A Lab", lab_a_base, [169101,169101,169101],[1,1,1],[1,2,3])
- lab_b = Lab("B Lab", lab_b_base, [175618,169101,175618,169101],[1,2,3,4],[1,2,3,3])
- lab_c = Lab("C Lab", lab_c_base, [175619,169101],[1,1],[1,1])
- lab_d = Lab("SSL", lab_d_base, [228886],[1],[1, 3])
- lab_e = Lab("STC", lab_e_base, [228886],[1],[1, 3])
- labs = Lab.instances
- rankings = []
- folder_path = r"C:\Users\dgsal\OneDrive\Desktop\Lab Data"
- vue = PyEmVue()
- # Main loop
- devices = vue.get_devices()
- device_consumptions = []
- initialize_current_consumption_with_baseline(lab_a, lab_a_base)
- initialize_current_consumption_with_baseline(lab_b, lab_b_base)
- initialize_current_consumption_with_baseline(lab_c, lab_c_base)
- initialize_current_consumption_with_baseline(lab_d, lab_d_base)
- initialize_current_consumption_with_baseline(lab_e, lab_e_base)
- lab_consumptions = {}
- time_delta_minutes = 60 # The time delta in minutes for which you want to check the consumption
- for lab in labs:
- lab_consumptions[lab.name] = get_total_lab_consumption(vue, lab, time_delta_minutes)
- # Print the total consumptions
- print(lab_consumptions)
- print(lab_a.current)
- print(lab_b.current)
- print(lab_c.current)
- print(lab_d.current)
- print(lab_e.current)
- current_pairings = randomize_pairings()
- last_check_day = datetime.datetime.now().day
- while True:
- try:
- device_consumptions = []
- summed_device_consumptions = {}
- time_delta_minutes = 60 # The time delta in minutes for which you want to check the consumption
- for lab in labs:
- summed_device_consumptions[lab.name] = get_total_lab_consumption(vue, lab, time_delta_minutes)
- print_lab_consumption_percentages(lab)
- # Print the total consumptions
- print(summed_device_consumptions)
- # Update current consumption for each lab using summed values
- for lab in labs:
- total_consumption = summed_device_consumptions[lab.name]
- update_lab_current_consumption(lab, total_consumption)
- for lab in labs:
- lab.compare_consumption()
- current_time = datetime.datetime.now()
- # Check if it's a new day
- if current_time.day != last_check_day:
- finalize_daily_results() # Finalize results of the previous day
- current_pairings = randomize_pairings() # Randomize new pairings
- last_check_day = current_time.day
- # Perform scheduled tasks and battles
- for lab1, lab2 in current_pairings:
- battle(lab1, lab2, rankings)
- print(daily_winners)
- arrange_function(rankings)
- for lab in labs:
- print(f"{lab.name} current consumption: {lab.current}")
- print(f"{lab.name} baseline: {lab.baseline}")
- lab.print_differences()
- lab.calculate_hourly_consumption()
- lab.update_graph_with_percentage_differences()
- lab.calculate_cumulative_percentage_difference()
- lab.update_graph_difference()
- lab.update_graph_current_consumption()
- sum_consumption_and_baseline(labs)
- sum_and_compare_each_lab(labs)
- print("Rankings:", rankings)
- #for saving
- save_lab_current_to_file(labs, folder_path)
- rankings = [] # restart rankings
- time.sleep(60) # change it around for testing
- except KeyboardInterrupt:
- print("Program interrupted by user. Exiting...")
- break
- except Exception as e:
- print(f"An unexpected error occurred: {e}. Retrying in 5 seconds...")
- time.sleep(5)
- continue
- """while True:
- # Clear previous data
- rankings.clear()
- for lab in labs:
- print(f"Entering data for {lab.name}")
- try:
- consumption = float(input("Enter consumption (kWh): "))
- hour = int(input("Enter hour of day (0-23): "))
- # Assuming the current day of the week
- current_date = datetime.datetime.now()
- day_of_week = current_date.weekday()
- # Ensure there's a list for today's consumption
- while len(lab.current) <= day_of_week:
- lab.current.append([0] * 24) # Initialize hours of the day with 0 consumption
- # Update the consumption for the specific hour
- lab.current[day_of_week][hour] = consumption
- print(f"Updated {lab.name}'s consumption at {hour}:00 to {consumption} kWh.")
- # Use the manual comparison function
- lab.compare_consumption_manual(day_of_week, hour)
- except ValueError:
- print("Invalid input. Please enter valid numbers for consumption and time.")
- # Perform analysis
- battle(lab_a, lab_b, rankings)
- battle(lab_c, lab_d, rankings)
- arrange_function(rankings)
- # Print current stuff
- for lab in labs:
- print(f"{lab.name} current consumption: {lab.current}")
- print(f"{lab.name} baseline: {lab.baseline}")
- print("Rankings:", rankings)
- save_lab_current_to_file(labs, folder_path)
- if input("Do you want to continue? (yes/no): ").lower() != 'yes':
- break
- """
Editor
You can edit this paste and save as new: