Source code for src.notifications

import asyncio
import typing
from time import time

from aiogram.utils.exceptions import TelegramAPIError

from import tg
from src.database import database
from src.handlers.base import show_order
from src.i18n import i18n
from import gateway_currency_regexp

[docs]async def run_loop(): """Notify order creators about expired orders in infinite loop.""" while True: cursor = database.orders.find( {"expiration_time": {"$lte": time()}, "notify": True} ) sent = False async for order in cursor: user = await database.users.find_one({"id": order["user_id"]}) message = i18n("order_expired", locale=user["locale"]) message += "\nID: {}".format(order["_id"]) try: if sent: await asyncio.sleep(1) # Avoid Telegram limit await tg.send_message(user["chat"], message) except TelegramAPIError: pass else: await show_order(order, user["chat"], user["id"], locale=user["locale"]) sent = True finally: await database.orders.update_one( {"_id": order["_id"]}, {"$set": {"notify": False}} ) if not sent: await asyncio.sleep(1) # Reduce database load
[docs]async def order_notification(order: typing.Mapping[str, typing.Any]): """Notify users about order. Subscriptions to these notifications are managed with **/subscribe** or **/unsubscribe** commands of ``start_menu`` handlers. """ users = database.subscriptions.find( { "subscriptions": { "$elemMatch": { "buy": {"$in": [gateway_currency_regexp(order["buy"]), None]}, "sell": {"$in": [gateway_currency_regexp(order["sell"]), None]}, } }, } ) async for user in users: if user["id"] == order["user_id"]: continue order = await database.orders.find_one({"_id": order["_id"]}) # Update order if not order or order.get("archived"): return await show_order( order, user["chat"], user["id"], show_id=True, locale=user["locale"] ) await asyncio.sleep(1) # Avoid Telegram limit