# Copyright (C) 2019 alfred richardsn
#
# This file is part of TellerBot.
#
# TellerBot is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with TellerBot. If not, see <https://www.gnu.org/licenses/>.
import typing
from contextvars import ContextVar
from aiogram.dispatcher.storage import BaseStorage
from motor.motor_asyncio import AsyncIOMotorClient
from src.config import config
try:
with open(config.DATABASE_PASSWORD_FILENAME, "r") as password_file:
client = AsyncIOMotorClient(
"mongodb://{username}:{password}@{host}:{port}/{name}".format(
host=config.DATABASE_HOST,
port=config.DATABASE_PORT,
username=config.DATABASE_USERNAME,
password=password_file.read().strip(),
name=config.DATABASE_NAME,
)
)
except (AttributeError, FileNotFoundError):
client = AsyncIOMotorClient(config.DATABASE_HOST)
database = client[config.DATABASE_NAME]
database_user: ContextVar[typing.Mapping[str, typing.Any]] = ContextVar("database_user")
[docs]class MongoStorage(BaseStorage):
"""MongoDB asynchronous storage for FSM using motor."""
[docs] async def get_state(self, user: int, **kwargs) -> typing.Optional[str]:
"""Get current state of user with Telegram ID ``user``."""
document = await database.users.find_one({"id": user})
return document.get("state") if document else None
[docs] async def set_state(
self, user: int, state: typing.Optional[str] = None, **kwargs
) -> None:
"""Set new state ``state`` of user with Telegram ID ``user``."""
if state is None:
await database.users.update_one({"id": user}, {"$unset": {"state": True}})
else:
await database.users.update_one({"id": user}, {"$set": {"state": state}})
[docs] async def get_data(self, user: int, **kwargs) -> typing.Dict:
"""Get state data of user with Telegram ID ``user``."""
document = await database.users.find_one({"id": user})
return document.get("data", {})
[docs] async def set_data(
self, user: int, data: typing.Optional[typing.Dict] = None, **kwargs
) -> None:
"""Set state data ``data`` of user with Telegram ID ``user``."""
if data is None:
await database.users.update_one({"id": user}, {"$unset": {"data": True}})
else:
await database.users.update_one({"id": user}, {"$set": {"data": data}})
[docs] async def update_data(
self, user: int, data: typing.Optional[typing.Dict] = None, **kwargs
) -> None:
"""Update data of user with Telegram ID ``user``."""
if data is None:
data = {}
data.update(kwargs)
await database.users.update_one(
{"id": user},
{"$set": {f"data.{key}": value for key, value in data.items()}},
)
[docs] async def reset_state(self, user: int, with_data: bool = True, **kwargs):
"""Reset state for user with Telegram ID ``user``."""
update = {"$unset": {"state": True}}
if with_data:
update["$unset"]["data"] = True
await database.users.update_one({"id": user}, update)
[docs] async def finish(self, user: int, **kwargs):
"""Finish conversation with user."""
await self.set_state(user=user, state=None)
[docs] async def wait_closed(self) -> None:
"""Do nothing.
Motor client does not use this method.
"""
[docs] async def close(self):
"""Disconnect from MongoDB."""
client.close()