-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpeer_broadcast_moonSatellite.py
170 lines (138 loc) · 6.66 KB
/
peer_broadcast_moonSatellite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import socket
import threading
import os
import glob
import time
# Rahul main contribution
class Peer:
def __init__(self, host, port, known_peers):
self.current_id = "Moon_Satellite"
self.host = host
self.port = port
self.known_peers = known_peers # List of known peers in the format (host, port)
self.peers = [] # List to store connected peers
self.message_cache = [] # Cache to store broadcast messages
self.message_receivers = {} # Dictionary to track receivers of each message
self.lock = threading.Lock() # Lock for thread safety
def start(self):
server_thread = threading.Thread(target=self.start_server)
server_thread.start()
input("Press Enter to start connecting to peers\n")
print("Connecting to other peers...\n")
self.connect_to_known_peers()
def start_server(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.host, self.port))
server.listen()
print(f"Server listening on {self.host}:{self.port}")
while True:
client_socket, client_address = server.accept()
threading.Thread(target=self.handle_client, args=(client_socket,)).start()
def connect_to_known_peers(self):
for peer in self.known_peers:
host, port = peer
self.connect_to_peer(host, port)
def connect_to_peer(self, host, port):
try:
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
self.peers.append((host, port, client))
print(f"Connected to {host}:{port}")
# Send any cached messages to the newly connected peer
with self.lock:
for cached_message, receivers in self.message_receivers.items():
if host not in receivers:
client.send(cached_message.encode('utf-8'))
print(f"Sent cached message to {host}:{port}")
except Exception as e:
print(f"Error connecting to peer: {e}")
def delete_file(self, file_name, root_directory):
# Create the file path pattern
file_path_pattern = os.path.join(root_directory, '**', file_name)
# Use glob to find all matching files
matching_files = glob.glob(file_path_pattern, recursive=True)
if not matching_files:
print(f"No matching files found for {file_name} in {root_directory}")
return
# Delete each matching file
for file_path in matching_files:
try:
os.remove(file_path)
print(f"Deleted file: {file_path}")
except Exception as e:
print(f"Error deleting file {file_path}: {e}")
def send_deleted_message(self, host, port, deleted_file, repetitions=20, interval=450):
message = f"{self.current_id} : Deleted {deleted_file}"
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((host, port))
for _ in range(repetitions):
client_socket.send(message.encode('utf-8'))
print(f"Sent deleted message to {host}:{port}")
time.sleep(interval)
except Exception as e:
print(f"Error sending deleted message: {e}")
def handle_client(self, client_socket):
while True:
try:
data = client_socket.recv(1024).decode('utf-8')
if not data:
break
print(f"Received message: {data}")
if "Deleted" in data:
# Update the message receivers
with self.lock:
if data in self.message_receivers:
self.message_receivers[data].append(data.split(":")[0].strip())
else:
self.message_receivers[data] = [data.split(":")[0].strip()]
elif "Delete" in data:
file_to_delete = data.split("Delete")[1].strip()
self.delete_file(file_to_delete, "/users/pgrad/singhr6/group24/data_test")
# Update the message receivers
with self.lock:
if data in self.message_receivers:
self.message_receivers[data].append(self.current_id)
else:
self.message_receivers[data] = [self.current_id]
except Exception as e:
print(f"Error receiving message: {e}")
break
def broadcast_message(self, message):
with self.lock:
self.message_cache.append(message) # Cache the message
for peer in self.peers:
_, _, client = peer
try:
client.send(message.encode('utf-8'))
except Exception as e:
print(f"Error sending message to {peer}: {e}")
def check_and_remove_messages(self):
devices_to_check = set(["Curiosity_Rover", "Mars_Rover", "Lander_Module", "Moon_Satellite"])
while True:
time.sleep(450) # Sleep for 7.5 minutes
with self.lock:
messages_to_remove = []
for cached_message, receivers in self.message_receivers.items():
received_devices = set([peer[0] for peer in receivers])
# Check if both sets have the same values
if devices_to_check == received_devices:
messages_to_remove.append(cached_message)
for message in messages_to_remove:
# Send a deleted message to IP_EARTH:PORT_EARTH
deleted_file = message.split(": Delete ")[1].strip()
self.send_deleted_message("IP_EARTH", PORT_EARTH, deleted_file, repetitions=20, interval=450)
self.message_receivers.pop(message)
self.message_cache.remove(message)
print(f"Removed message from cache: {message}")
if __name__ == "__main__":
# Define your host, port, and a list of known peers
current_id = "Moon_Satellite"
current_host = "IP_MOON_SATELLITE"
current_port = PORT_MOON_SATELLITE
known_peers = [("IP_CURIOSITY_ROVER", PORT_CURIOSITY_ROVER), ("IP_MARS_ROVER", PORT_MARS_ROVER), ("IP_LANDER_MODULE", PORT_LANDER_MODULE)]
# Create a Peer instance
peer = Peer(current_host, current_port, known_peers)
# Start the check_and_remove_messages thread
threading.Thread(target=peer.check_and_remove_messages).start()
peer.start()