-
Notifications
You must be signed in to change notification settings - Fork 2
/
app.py
129 lines (104 loc) · 4.23 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import re
import requests
import json
import os
import sys
from typing import Dict
from urllib.parse import urljoin
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import Envelope, Session, SMTP
from sendmail import send_mail
class EmailHandler:
def __init__(self, config: Dict[str, str]):
self.receiver_regex = re.compile(r"(\+?\d+)@signal.localdomain")
self.subject_regex = re.compile(r"Subject: (.*)\n")
self.image_regex = re.compile(
r'Content-Type: image/png; name=".*"\n+((?:[A-Za-z\d+/]{4}|\n)*(?:[A-Za-z\d+/]{2}==|[A-Za-z\d+/]{3}=)?)'
)
self.config = config
async def handle_RCPT(
self, server: SMTP, session: Session, envelope: Envelope, address, rcpt_options: list[str]
) -> str:
# match and process signal number
if match := re.search(self.receiver_regex, address):
try:
number = match.group(1)
except TypeError:
return "500 Malformed receiver address"
if not address.startswith("+"):
number = "+" + number
envelope.rcpt_tos.append(number)
# simply append normal mail address
else:
envelope.rcpt_tos.append(address)
return "250 OK"
async def handle_DATA(self, server: SMTP, session: Session, envelope: Envelope) -> str:
signal_numbers = []
mail_addresses = []
for addr in envelope.rcpt_tos:
# a real email address cannot start with a special char
if addr.startswith("+"):
signal_numbers.append(addr)
else:
mail_addresses.append(addr)
# send signal message if required
if len(signal_numbers) > 0:
print("Forwarding message to signal")
success = await self.send_signal(envelope, signal_numbers)
if not success:
return "554 Sending signal message has failed"
# send email if required
if len(mail_addresses) == 0:
return "250 Message accepted for delivery"
else:
envelope.rcpt_tos = mail_addresses
print(f"Sending email via MTA. From: {envelope.mail_from} To: {envelope.rcpt_tos}")
return send_mail(
self.config["smtp_host"],
int(self.config["smtp_port"]),
self.config["smtp_user"],
self.config["smtp_passwd"],
envelope,
)
async def send_signal(self, envelope: Envelope, signal_receivers: list[str]) -> bool:
# Remove carriage returns, they break the image checking regex
content = envelope.content.decode("utf8").replace("\r", "")
if match := re.search(self.subject_regex, content):
msg = match.group(1)
else:
return False
payload = {"message": msg, "number": self.config["sender_number"], "recipients": signal_receivers}
if match := re.search(self.image_regex, content):
image = match.group(1).replace("\n", "")
payload["base64_attachments"] = [image]
headers = {"Content-Type": "application/json"}
url = urljoin(self.config["signal_rest_url"], "v2/send")
response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
if response.status_code == 201:
return True
else:
return False
async def amain(loop: asyncio.AbstractEventLoop):
try:
config = {
"signal_rest_url": os.environ["SIGNAL_REST_URL"],
"sender_number": os.environ["SENDER_NUMBER"],
"smtp_host": os.environ["SMTP_HOST"],
"smtp_user": os.environ["SMTP_USER"],
"smtp_passwd": os.environ["SMTP_PASSWORD"],
"smtp_port": os.getenv("SMTP_PORT", "587"),
}
except KeyError:
sys.exit("Please set the required environment variables.")
print("Starting email2signal server")
email_handler = EmailHandler(config)
controller = Controller(email_handler, hostname="")
controller.start()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.create_task(amain(loop=loop))
try:
loop.run_forever()
except KeyboardInterrupt:
pass