-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
code cleanup and added qos api value
- Loading branch information
Mario
committed
Dec 8, 2024
1 parent
b6e79b5
commit 2560dee
Showing
5 changed files
with
144 additions
and
93 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
import configparser | ||
|
||
config = configparser.ConfigParser() | ||
config.read('app.conf') | ||
|
||
TELEGRAF_CONFIG_PATH = "telegraf.conf" | ||
LOGIN_REQUIRED = config.getboolean("auth", "login_required", fallback=False) | ||
LOGIN_PASSWORD = config.get("auth", "password", fallback="") | ||
|
||
LOGIN_ATTEMPTS: dict[str, list] = {} | ||
MAX_ATTEMPTS = int(config.get("auth", "max_attempts", fallback=3)) | ||
BLOCK_TIME = int(config.getint("auth", "block_time", fallback=60)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import re | ||
|
||
from config import TELEGRAF_CONFIG_PATH | ||
|
||
# Reads topic list from telegraf.conf | ||
def read_topics(): | ||
with open(TELEGRAF_CONFIG_PATH, "r") as file: | ||
config_data = file.read() | ||
|
||
# searches for [[inputs.mqtt_consumer]] and extracts topics | ||
mqtt_consumer_match = re.search(r'\[\[inputs\.mqtt_consumer\]\](.*?)\n\s*topics\s*=\s*\[(.*?)\]', config_data, re.DOTALL) | ||
|
||
if mqtt_consumer_match: | ||
topics_string = mqtt_consumer_match.group(2).strip() | ||
topics = re.findall(r'"([^"]*)"', topics_string) | ||
return topics | ||
return [] # In case no Topics are found | ||
|
||
# reads server address from telegraf.conf | ||
def read_server_addr(): | ||
with open(TELEGRAF_CONFIG_PATH, "r") as file: | ||
config_data = file.read() | ||
|
||
# searches for [[inputs.mqtt_consumer]] and extracts topics | ||
mqtt_consumer_match = re.search(r'\[\[inputs\.mqtt_consumer\]\](.*?)\n\s*servers\s*=\s*\[(.*?)\]', config_data, re.DOTALL) | ||
|
||
if mqtt_consumer_match: | ||
servers_string = mqtt_consumer_match.group(2).strip() | ||
servers = re.findall(r'"([^"]*)"', servers_string) | ||
return servers | ||
return [] # In case no Topics are found | ||
|
||
# reads QoS from telegraf.conf | ||
def read_qos(): | ||
with open(TELEGRAF_CONFIG_PATH, "r") as file: | ||
config_data = file.read() | ||
|
||
mqtt_consumer_match = mqtt_consumer_match = re.search(r'\[\[inputs\.mqtt_consumer\]\](.*?)^[ \t]*qos\s*=\s*(\d+)', config_data, re.MULTILINE | re.DOTALL) | ||
|
||
if mqtt_consumer_match: | ||
return mqtt_consumer_match.group(2) | ||
return [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import re | ||
|
||
from config import TELEGRAF_CONFIG_PATH | ||
|
||
# Saves topics to telegraf.conf | ||
def save_topics(topics): | ||
with open(TELEGRAF_CONFIG_PATH, 'r') as file: | ||
config_data = file.read() | ||
|
||
# RegEx to Find topics in telegraf.conf | ||
topics_match = re.search(r'topics\s*=\s*\[(.*?)\]', config_data, re.DOTALL) | ||
if topics_match: | ||
new_topics_string = "\n ".join([f'"{topic}"' for topic in topics]) | ||
config_data = re.sub(r'topics\s*=\s*\[([^\]]+)\]', f'topics = [\n {new_topics_string},\n ]', config_data) | ||
|
||
with open(TELEGRAF_CONFIG_PATH, 'w') as file: | ||
file.write(config_data) | ||
return True | ||
|
||
return False | ||
|
||
# Saves server address to telegraf.conf | ||
def save_server_addr(server_addr): | ||
with open(TELEGRAF_CONFIG_PATH, 'r') as file: | ||
config_data = file.read() | ||
# RegEx, um die servers-Zeile zu finden | ||
mqtt_section_match = re.search(r'\[\[inputs\.mqtt_consumer\]\](.*?)(?=\n\[|\Z)', config_data, re.DOTALL) | ||
|
||
if mqtt_section_match: | ||
section_content = mqtt_section_match.group(0) | ||
# Nur aktive servers-Zeilen bearbeiten (keine auskommentierten) | ||
active_servers_match = re.search(r'^\s*servers\s*=\s*\[([^\]]+)\]', section_content, re.MULTILINE) | ||
|
||
if active_servers_match: | ||
# Neues Server-Format erstellen | ||
new_servers_string = f'{server_addr}' | ||
# Ersetzen der aktiven servers-Zeile | ||
updated_section = re.sub(r'^\s*servers\s*=\s*\[([^\]]+)\]', f' servers = {new_servers_string}', section_content, flags=re.MULTILINE) | ||
|
||
# Abschnitt in der Konfiguration aktualisieren | ||
config_data = config_data.replace(section_content, updated_section) | ||
|
||
# Datei speichern | ||
with open(TELEGRAF_CONFIG_PATH, 'w') as file: | ||
file.write(config_data) | ||
return True | ||
|
||
return False | ||
|
||
# Saves QoS to telegraf.conf | ||
def save_qos(qos): | ||
if qos not in (0, 1, 2): | ||
return False | ||
|
||
with open(TELEGRAF_CONFIG_PATH, "r") as file: | ||
config_data = file.read() | ||
|
||
# Ersetzt den vorhandenen QoS-Wert durch den neuen Wert | ||
updated_config_data = re.sub( | ||
r'(^[ \t]*qos\s*=\s*)(\d+)', # Findet die `qos =`-Zeile mit dem aktuellen Wert | ||
rf'\1{qos}', # Ersetzt den Wert durch `new_qos`, behält das Präfix bei | ||
config_data, | ||
flags=re.MULTILINE | ||
) | ||
|
||
with open(TELEGRAF_CONFIG_PATH, "w") as file: | ||
file.write(updated_config_data) | ||
return True |