Skip to content

Commit

Permalink
black
Browse files Browse the repository at this point in the history
  • Loading branch information
wicol committed Jun 24, 2021
1 parent 8eefa11 commit ff2d24b
Showing 1 changed file with 67 additions and 60 deletions.
127 changes: 67 additions & 60 deletions emqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,34 @@


defaults = {
'SMTP_PORT': 1025,
'MQTT_HOST': 'localhost',
'MQTT_PORT': 1883,
'MQTT_USERNAME': '',
'MQTT_PASSWORD': '',
'MQTT_TOPIC': 'emqtt',
'MQTT_PAYLOAD': 'ON',
'MQTT_RESET_TIME': '300',
'MQTT_RESET_PAYLOAD': 'OFF',
'SAVE_ATTACHMENTS': 'True',
'SAVE_ATTACHMENTS_DURING_RESET_TIME': 'False',
'DEBUG': 'False'
"SMTP_PORT": 1025,
"MQTT_HOST": "localhost",
"MQTT_PORT": 1883,
"MQTT_USERNAME": "",
"MQTT_PASSWORD": "",
"MQTT_TOPIC": "emqtt",
"MQTT_PAYLOAD": "ON",
"MQTT_RESET_TIME": "300",
"MQTT_RESET_PAYLOAD": "OFF",
"SAVE_ATTACHMENTS": "True",
"SAVE_ATTACHMENTS_DURING_RESET_TIME": "False",
"DEBUG": "False",
}
config = {
setting: os.environ.get(setting, default)
for setting, default in defaults.items()
setting: os.environ.get(setting, default) for setting, default in defaults.items()
}
# Boolify
for key, value in config.items():
if value == 'True':
if value == "True":
config[key] = True
elif value == 'False':
elif value == "False":
config[key] = False

level = logging.DEBUG if config['DEBUG'] else logging.INFO
level = logging.DEBUG if config["DEBUG"] else logging.INFO

log = logging.getLogger('emqtt')
log = logging.getLogger("emqtt")
log.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

# Log to console
ch = logging.StreamHandler()
Expand All @@ -52,48 +51,51 @@
class EMQTTHandler:
def __init__(self, loop):
self.loop = loop
self.reset_time = int(config['MQTT_RESET_TIME'])
self.reset_time = int(config["MQTT_RESET_TIME"])
self.handles = {}
self.quit = False
signal.signal(signal.SIGTERM, self.set_quit)
signal.signal(signal.SIGINT, self.set_quit)
if config['SAVE_ATTACHMENTS']:
log.info('Configured to save attachments')
if config["SAVE_ATTACHMENTS"]:
log.info("Configured to save attachments")

async def handle_DATA(self, server, session, envelope):
log.debug('Message from %s', envelope.mail_from)
log.debug("Message from %s", envelope.mail_from)
msg = email.message_from_bytes(envelope.original_content, policy=default)
log.debug(
'Message data (truncated): %s',
envelope.content.decode('utf8', errors='replace')[:250]
"Message data (truncated): %s",
envelope.content.decode("utf8", errors="replace")[:250],
)
topic = '{}/{}'.format(config['MQTT_TOPIC'], envelope.mail_from.replace('@', ''))
self.mqtt_publish(topic, config['MQTT_PAYLOAD'])
topic = "{}/{}".format(
config["MQTT_TOPIC"], envelope.mail_from.replace("@", "")
)
self.mqtt_publish(topic, config["MQTT_PAYLOAD"])

# Save attached files if configured to do so.
if config['SAVE_ATTACHMENTS'] and (
# Don't save them during reset time unless configured to do so.
topic not in self.handles
or config['SAVE_ATTACHMENTS_DURING_RESET_TIME']):
if config["SAVE_ATTACHMENTS"] and (
# Don't save them during reset time unless configured to do so.
topic not in self.handles
or config["SAVE_ATTACHMENTS_DURING_RESET_TIME"]
):
log.debug(
'Saving attachments. Topic "%s" aldready triggered: %s, '
'Save attachment override: %s',
topic,
topic in self.handles,
config['SAVE_ATTACHMENTS_DURING_RESET_TIME']
"Save attachment override: %s",
topic,
topic in self.handles,
config["SAVE_ATTACHMENTS_DURING_RESET_TIME"],
)
for att in msg.iter_attachments():
# Just save images
if not att.get_content_type().startswith('image'):
if not att.get_content_type().startswith("image"):
continue
filename = att.get_filename()
image_data = att.get_content()
file_path = os.path.join('attachments', filename)
log.info('Saving attached file %s to %s', filename, file_path)
with open(file_path, 'wb') as f:
file_path = os.path.join("attachments", filename)
log.info("Saving attached file %s to %s", filename, file_path)
with open(file_path, "wb") as f:
f.write(image_data)
else:
log.debug('Not saving attachments')
log.debug("Not saving attachments")
log.debug(self.handles)

# Cancel any current scheduled resets of this topic
Expand All @@ -103,52 +105,57 @@ async def handle_DATA(self, server, session, envelope):
if self.reset_time:
# Schedule a reset of this topic
self.handles[topic] = self.loop.call_later(
self.reset_time,
self.reset,
topic
self.reset_time, self.reset, topic
)
return '250 Message accepted for delivery'
return "250 Message accepted for delivery"

def mqtt_publish(self, topic, payload):
log.info('Publishing "%s" to %s', payload, topic)
try:
publish.single(
topic,
payload,
hostname=config['MQTT_HOST'],
port=int(config['MQTT_PORT']),
hostname=config["MQTT_HOST"],
port=int(config["MQTT_PORT"]),
auth={
'username': config['MQTT_USERNAME'],
'password': config['MQTT_PASSWORD']
} if config['MQTT_USERNAME'] else None
"username": config["MQTT_USERNAME"],
"password": config["MQTT_PASSWORD"],
}
if config["MQTT_USERNAME"]
else None,
)
except Exception as e:
log.exception('Failed publishing')
log.exception("Failed publishing")

def reset(self, topic):
log.info(f'Resetting topic {topic}')
log.info(f"Resetting topic {topic}")
self.handles.pop(topic)
self.mqtt_publish(topic, config['MQTT_RESET_PAYLOAD'])
self.mqtt_publish(topic, config["MQTT_RESET_PAYLOAD"])

def set_quit(self, *args):
log.info('Quitting...')
log.info("Quitting...")
self.quit = True


if __name__ == '__main__':
log.debug(', '.join([f'{k}={v}' for k, v in config.items()]))
if __name__ == "__main__":
log.debug(", ".join([f"{k}={v}" for k, v in config.items()]))

# If there's a dir called log - set up a filehandler
if os.path.exists('log'):
log.info('Setting up a filehandler')
fh = logging.FileHandler('log/emqtt.log')
if os.path.exists("log"):
log.info("Setting up a filehandler")
fh = logging.FileHandler("log/emqtt.log")
fh.setFormatter(formatter)
log.addHandler(fh)

loop = asyncio.get_event_loop()
c = Controller(handler=EMQTTHandler(loop), loop=loop, hostname='0.0.0.0', port=config['SMTP_PORT'])
c = Controller(
handler=EMQTTHandler(loop),
loop=loop,
hostname="0.0.0.0",
port=config["SMTP_PORT"],
)
c.start()
log.info('Running')
log.info("Running")
try:
while not c.handler.quit:
time.sleep(0.5)
Expand Down

0 comments on commit ff2d24b

Please sign in to comment.