dotfiles/home/utils/services.nix

65 lines
1.8 KiB
Nix

{ pkgs, scripts }:
let
notify_script = (pkgs.writers.writePython3Bin "telegram-notify.py" {
libraries = [ pkgs.python3Packages.python-telegram-bot ];
} ''
import telegram
import asyncio
import sys
import socket
import subprocess
async def run():
text = subprocess.check_output(
["journalctl", "--user", "-u", sys.argv[1], "-b"]
).decode("utf-8")
with open("/home/server/mail.log", "a") as f:
f.write("===========================================")
f.write(text)
bot = telegram.Bot(token="381718873:AAElFmI2BDjumCehhWicuksE0vutrPSkoGA")
chat_id = 125754925
hostname = socket.gethostname()
await bot.send_message(
chat_id,
f"{hostname} encountered an error in the service: {sys.argv[1]}"
)
await bot.send_message(chat_id, text)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([loop.create_task(run())]))
loop.close()
'');
mkTimer = name: cfg: {
Install.WantedBy = [ "timers.target" ];
Timer = {
Persistent = true;
OnCalendar = cfg.when;
Unit = "${name}.service";
};
};
mkService = name: cfg: {
Unit.Description = name;
Unit.OnFailure = "status_notify@%n.service";
Install = { WantedBy = [ "default.target" ]; };
Service = { ExecStart = cfg.script; };
};
in
{
systemd.user.services = pkgs.lib.mapAttrs mkService scripts
// (pkgs.lib.mapAttrs mkService {
"status_notify@" = {
script = "${notify_script}/bin/telegram-notify.py %i";
};
});
systemd.user.timers = pkgs.lib.mapAttrs mkTimer scripts;
# Don't forget to enable these timers! Or reboot, after which it should also be activated automatically
# systemctl --user enable --now <script>.timer
}