-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_security.py
429 lines (347 loc) · 17.5 KB
/
test_security.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# THIS PLUGIN IS ON DEVELOPMENT
# pip3 install netifaces
# pip3 install scapy
# pip3 install flask?
import logging
from urllib import request
import pwnagotchi.plugins as plugins
from pwnagotchi.ui.components import LabeledValue
from pwnagotchi.ui.view import BLACK
import pwnagotchi.ui.fonts as fonts
import netifaces
import scapy
from flask import Flask, jsonify
app = Flask(__name__)
class SecurityMonitor(plugins.Plugin):
__GitHub__ = "https://github.com/MaliosDark/pwnagotchi-plugins"
__author__ = "MaliosDark, (itsdarklikehell [email protected])"
__version__ = "1.0.8.2"
__license__ = "GPL3"
__description__ = "LAN Security Monitor Plugin for Pwnagotchi"
__name__ = "SecurityMonitor"
__help__ = "LAN Security Monitor Plugin for Pwnagotchi"
__dependencies__ = {
"apt": ["none"],
"pip": ["scapy"],
}
__defaults__ = {
"enabled": False,
}
def __init__(self):
logging.debug("Security Monitor plugin created")
def on_loaded(self):
logging.info(f"[{self.__class__.__name__}] plugin loaded")
def on_ui_setup(self, ui):
# Add custom UI elements for security status
ui.add_element(
"security_status",
LabeledValue(
color=BLACK,
label="Security ",
value=" Safe",
position=(ui.width() / 2 - 25, 0),
label_font=fonts.Bold,
text_font=fonts.Medium,
),
)
# Add a new UI element to display security warnings
ui.add_element(
"security_warnings",
LabeledValue(
color=BLACK,
label="Security Warnings ",
value= security_warnings,
position=(ui.width() / 2 - 25, 2),
label_font=fonts.Bold,
text_font=fonts.Small,
),
)
self.ui = ui
def on_wifi_update(self, agent, access_points):
# Analyze WiFi updates and check for security issues
security_status, security_warnings = self.check_security(access_points)
agent.set("security_status", security_status)
logging.info("on_wifi_update called")
# Update the UI element with security warnings
self.ui.set("security_warnings", ", ".join(security_warnings))
def check_security(self, access_points):
# Initialize a list to store security issues
security_warnings = []
# Iterate through each access point
for ap in access_points:
essid = ap.get("essid", "Unknown")
# Check for common security issues
encryption = ap.get("encryption", "")
if "WEP" in encryption:
security_warnings.append(f"Detected WEP network - {essid}")
if "WPA" in encryption:
# You can add more specific checks for WPA configurations, e.g., WPA1, WPA2, WPA3
security_warnings.append(f"Detected WPA network - {essid}")
# Check for weak passwords (you may need to customize this based on your criteria)
if ap.get("password") in ["admin", "password", "123456"]:
security_warnings.append(f"Weak password for network - {essid}")
# Check for open networks
if "OPN" in encryption:
security_warnings.append(f"Detected open network - {essid}")
# Add more checks as needed for your specific security criteria
if security_warnings:
# If there are security issues, log and return 'Warning'
for warning in security_warnings:
logging.info(f"Security Warning: {warning}")
return "Warning", security_warnings
else:
# If no security issues are found, log and return 'Safe'
logging.info("No security issues detected.")
return "Safe", []
def on_handshake(self, agent, filename, access_point, client_station):
# Called when a new handshake is captured
logging.info(f"Handshake captured from {access_point} to {client_station}")
# Save the handshake file for further analysis or processing
self.save_handshake(filename)
# You can implement further actions, such as alerting or analyzing the handshake
self.analyze_handshake(access_point, client_station)
def on_deauthentication(self, agent, access_point, client_station):
# Called when the agent is deauthenticating a client station from an AP
logging.info(
f"Deauthentication detected from {client_station} to {access_point}"
)
# You can implement further actions, such as alerting or blocking the client station
self.block_client_station(client_station)
self.alert_deauthentication(access_point, client_station)
# Additional methods for further actions
def save_handshake(self, filename):
# Implement logic to save the handshake file
logging.info(f"Saving handshake file: {filename}")
# You can use additional libraries or tools for handshake file processing
def analyze_handshake(self, access_point, client_station):
# Implement logic to analyze the handshake data
logging.info(f"Analyzing handshake from {client_station} to {access_point}")
# You can perform security checks, extract information, or trigger further actions
def block_client_station(self, client_station):
# Implement logic to block the client station (e.g., through MAC address filtering)
logging.info(f"Blocking client station: {client_station}")
# You may need to interact with the network infrastructure for effective blocking
def alert_deauthentication(self, access_point, client_station):
# Implement logic to send alerts or notifications about deauthentication events
logging.info(
f"Alerting about deauthentication from {client_station} to {access_point}"
)
# You can use external services or APIs for alerting
def on_bored(self, agent):
# Called when the status is set to bored
logging.info("Pwnagotchi is bored. Initiating network scan.")
scan_result = self.scan_network()
logging.info(f"Scan Result: {scan_result}")
self.ui.set("security_warnings", scan_result)
# You can implement further actions, such as alerting or analyzing the scan result
self.analyze_network_scan(scan_result)
def on_excited(self, agent):
# Called when the status is set to excited
logging.info("Pwnagotchi is excited. Performing deep packet inspection.")
deep_packet_result = self.deep_packet_inspection()
logging.info(f"Deep Packet Inspection Result: {deep_packet_result}")
# You can implement further actions, such as alerting or analyzing the deep packet inspection result
self.analyze_deep_packet_inspection(deep_packet_result)
def on_rebooting(self, agent):
# Called when the agent is rebooting the board
logging.info("Rebooting Pwnagotchi.")
# You can implement further actions, such as cleanup or saving state before reboot
self.cleanup_before_reboot()
def analyze_network_scan(self, scan_result):
# Implement logic to analyze the network scan result
logging.info("Analyzing network scan result.")
# Update the UI element with the list of discovered devices
discovered_devices = [
f"{device['ip']} ({device['mac']})" for device in scan_result
]
devices_info = ", ".join(discovered_devices)
self.ui.set("discovered_devices", devices_info)
logging.info(f"Discovered Devices: {devices_info}")
# You can perform security checks, extract information, or trigger further action
def analyze_deep_packet_inspection(self, deep_packet_result):
# Implement logic to analyze the deep packet inspection result
logging.info("Analyzing deep packet inspection result.")
# Example: Check for specific patterns or anomalies in the captured packets
if "malicious_pattern" in deep_packet_result:
logging.warn("Malicious pattern detected in the network traffic.")
# You can perform additional actions, such as alerting or blocking
# Example: Extract information from the captured packets
extracted_info = self.extract_info_from_packets(deep_packet_result)
logging.info(f"Extracted information: {extracted_info}")
# You can use the extracted information for further analysis or actions
# Example: Perform additional security checks based on the inspection result
security_checks_passed = self.perform_security_checks(deep_packet_result)
if security_checks_passed:
logging.info("Deep packet inspection passed security checks.")
else:
logging.warn("Security checks failed. Potential security issues detected.")
# You can take appropriate actions based on the security check results
def extract_info_from_packets(self, deep_packet_result):
# Implement logic to extract information from the captured packets
# For example, you can extract source/destination IP addresses, protocols, etc.
extracted_info = {} # Replace with actual extraction logic
return extracted_info
def perform_security_checks(self, deep_packet_result):
# Implement additional security checks based on the deep packet inspection result
# For example, check for known vulnerabilities or suspicious behavior
# Return True if all checks pass, False otherwise
return True # Replace with actual security check logic
def cleanup_before_reboot(self):
# Implement logic to perform cleanup or save state before rebooting
logging.info("Performing cleanup before reboot.")
# Example: Save important information or state to a file
self.save_state_to_file()
# Example: Close network connections or release resources
self.close_network_connections()
# Example: Gracefully shut down any running processes or services
self.shutdown_processes()
def save_state_to_file(self):
# Implement logic to save important information or state to a file
# For example, save configuration settings, captured data, etc.
logging.info("Saving state to a file.")
# Replace with actual saving logic
def close_network_connections(self):
# Implement logic to close network connections or release resources
# For example, close sockets, disconnect from servers, etc.
logging.info("Closing network connections.")
# Replace with actual closing logic
def shutdown_processes(self):
# Implement logic to gracefully shut down any running processes or services
logging.info("Shutting down processes.")
# Replace with actual shutdown logic
def scan_network(self):
# Implement network scanning logic here using scapy
logging.info("Scanning network to discover devices.")
# Obtain the local IP address of the device's interface
local_ip = self.get_local_ip()
# Use ARP request to discover devices on the local network
arp_request = scapy.ARP(pdst=f"{local_ip}/24")
broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast / arp_request
answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]
# Extract device information from the answered list
devices_list = []
for element in answered_list:
device_info = {"ip": element[1].psrc, "mac": element[1].hwsrc}
devices_list.append(device_info)
return devices_list
def get_local_ip(self):
# Obtain the local IP address of the device's interface
try:
# Get the default gateway interface
default_gateway = netifaces.gateways()["default"][netifaces.AF_INET][1]
# Get the local IP address associated with the default gateway interface
local_ip = netifaces.ifaddresses(default_gateway)[netifaces.AF_INET][0][
"addr"
]
return local_ip
except KeyError as e:
logging.error(f"KeyError: {e}. Unable to determine the local IP address.")
return None
except IndexError as e:
logging.error(f"IndexError: {e}. Unable to determine the local IP address.")
return None
except netifaces.netifacesError as e:
logging.error(
f"netifacesError: {e}. Unable to determine the local IP address."
)
return None
except Exception as e:
logging.error(f"Error obtaining local IP: {e}")
return None
def deep_packet_inspection(self):
try:
# Implement deep packet inspection logic here using scapy or other libraries
logging.info("Performing deep packet inspection.")
# Example: Capture the first 10 packets on the network
captured_packets = scapy.sniff(count=10)
# Analyze the captured packets (replace with your specific analysis logic)
for packet in captured_packets:
self.analyze_packet(packet)
# If you find a security issue, return an appropriate message
if self.security_issue_detected:
return "Security issues found."
else:
return "No security issues found."
except scapy.Scapy_Exception as e:
logging.error(f"Scapy exception during deep packet inspection: {e}")
return "Scapy exception during deep packet inspection."
except Exception as e:
logging.error(f"Error during deep packet inspection: {e}")
return "Error during deep packet inspection."
# Add more methods as needed for other events you want to monitor
# ...
def analyze_packet(self, packet):
# Implement your specific analysis logic for each packet type
try:
# Example: Check for a specific pattern or anomaly in the packet
if "malicious_pattern" in str(packet.payload):
logging.warn("Malicious pattern detected in the network traffic.")
self.security_issue_detected = True
# You can perform additional actions, such as alerting or blocking
# Example: Extract information from the packet
extracted_info = self.extract_info_from_packet(packet)
logging.info(f"Extracted information from packet: {extracted_info}")
# You can use the extracted information for further analysis or actions
# Example: Perform additional security checks based on the packet content
security_checks_passed = self.perform_security_checks(packet)
if not security_checks_passed:
logging.warn(
"Security checks failed. Potential security issues detected."
)
self.security_issue_detected = True
# You can take appropriate actions based on the security check results
except scapy.Scapy_Exception as e:
logging.error(f"Scapy exception during packet analysis: {e}")
except Exception as e:
logging.error(f"Error during packet analysis: {e}")
def extract_info_from_packet(self, packet):
extracted_info = {}
# Example: Extract source and destination IP addresses
if "IP" in packet:
extracted_info["source_ip"] = packet["IP"].src
extracted_info["destination_ip"] = packet["IP"].dst
# Example: Extract protocol information
if "IP" in packet:
extracted_info["protocol"] = packet["IP"].proto
# Add more extraction logic based on your specific requirements
return extracted_info
def perform_security_checks(self, packet):
# Example: Check for known vulnerabilities in the packet content
if "malicious_pattern" in str(packet.payload):
logging.warn("Malicious pattern detected in the packet.")
return False # Security check failed
# Example: Check for suspicious behavior or conditions
if "SuspiciousHeader" in packet:
logging.warn("Suspicious header detected in the packet.")
return False # Security check failed
# Add more security check logic based on your specific requirements
# If all security checks pass, return True
return True
# Optionally, you can connect to the Pwnagotchi's AI for learning interactions
def on_ai_policy(self, agent, policy):
# Called when the AI finds a new set of parameters
# For demonstration purposes, log the AI policy
logging.info(f"AI Policy: {policy}")
# You can implement further actions, such as adjusting security thresholds based on AI insights
@app.route('/plugins/test_security', methods=['GET'])
def test_security():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Test Security Plugin</title>
</head>
<body>
<h1>Test Security Plugin</h1>
<p>Security Status: Safe</p>
<p>Security Warnings: None</p>
</body>
</html>
"""
# Retornar el HTML como respuesta
return make_response(html_content)
def on_webhook(self, path, request):
logging.info(f"[{self.__class__.__name__}] webhook pressed")
if __name__ == '__main__':
app.run(debug=True)