MonoTG/monobank_bot.py
2025-05-29 11:12:24 +03:00

190 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""Monobank Telegram Bot (Async)
Quick start:
pip install -r requirements.txt
cp .env.example .env
python monobank_bot_refactored.py
"""
from __future__ import annotations
import asyncio, logging, os, re, time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List
import aiohttp
import html
from dotenv import load_dotenv
from filelock import FileLock
from logging.handlers import RotatingFileHandler
from telegram import Update, BotCommand
from telegram.request import HTTPXRequest
from telegram.constants import ParseMode
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, JobQueue
# ---------------------------------------------------------------------------
# ✦ Environment & config
# ---------------------------------------------------------------------------
load_dotenv()
try:
from tokens import TOKEN as _TG_TOKEN_FILE, MONO_TOKEN as _MONO_TOKEN_FILE, ALLOWED_USERS as _ALLOWED_USERS_FILE
except ImportError:
_TG_TOKEN_FILE = _MONO_TOKEN_FILE = None
_ALLOWED_USERS_FILE = []
TOKEN: str | None = os.getenv('TELEGRAM_TOKEN', _TG_TOKEN_FILE) # type: ignore
MONO_TOKEN: str | None = os.getenv('MONO_TOKEN', _MONO_TOKEN_FILE) # type: ignore
ALLOWED_USERS: set[int] = {int(x) for x in os.getenv('ALLOWED_USERS', ','.join(map(str, _ALLOWED_USERS_FILE))).split(',') if x}
if not (TOKEN and MONO_TOKEN and ALLOWED_USERS):
raise RuntimeError('Provide TELEGRAM_TOKEN, MONO_TOKEN and ALLOWED_USERS')
DATA_DIR = Path.cwd()
LOG_FILE = DATA_DIR / 'logs.txt'
LAST_TIME_FILE = DATA_DIR / 'last_time.txt'
LOCK_FILE = DATA_DIR / 'last_time.lock'
# ---------------------------------------------------------------------------
logger = logging.getLogger('mono_bot')
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(LOG_FILE, maxBytes=2_000_000, backupCount=5, encoding='utf-8')
handler.setFormatter(logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s'))
logger.addHandler(handler)
def log(msg: str, lvl=logging.INFO): logger.log(lvl, msg)
def escape_md(text: str) -> str:
return re.sub(r'([_!*`\\[\\]()~>#+\-=|{}\.])', r'\\\1', text) if text else ''
currency_symbols: Dict[int, str] = {980:'', 840:'$', 978:'', 985:''}
mcc_emojis: Dict[int, str] = {
5411:'🛒',5499:'🛒',5441:'🍰',5422:'🥩',5812:'🍽️',5813:'🍽️',5814:'☕️',
5912:'💊',5977:'💊',4112:'🚌️',4131:'🚌️',4111:'🚌️',5541:'⛽️',5542:'⛽️',
4121:'🚕',5993:'🚬',5211:'👷',7832:'🎮',7922:'🎮',5732:'🖥',4814:'📱',
4900:'💡',4816:'🌐',5817:'▶️',8398:'🎗️',6011:'🏦',6012:'🏦',4829:'💳→'
}
class MonoClient:
BASE = 'https://api.monobank.ua'
def __init__(self, token: str, session: aiohttp.ClientSession):
self.h = {'X-Token': token}; self.s = session
async def _get(self, path:str):
async with self.s.get(f'{self.BASE}{path}', headers=self.h) as r:
if r.status==429: raise RuntimeError('Rate-limit')
r.raise_for_status(); return await r.json()
async def client_info(self): return await self._get('/personal/client-info')
async def statement(self, acc:str, frm:int, to:int|None=None):
return await self._get(f'/personal/statement/{acc}/{frm}'+(f'/{to}' if to else ''))
async def currency(self): return await self._get('/bank/currency')
def load_last()->int:
try: return int(LAST_TIME_FILE.read_text())
except Exception: log('last_time missing',logging.WARNING); return int(time.time())
def save_last(ts:int):
ts=min(ts,int(time.time()))
tmp=LAST_TIME_FILE.with_suffix('.tmp')
with FileLock(str(LOCK_FILE)):
tmp.write_text(str(ts)); tmp.replace(LAST_TIME_FILE)
log(f'saved last_time {datetime.fromtimestamp(ts)}')
def fmt(tx:Dict[str,Any], accounts:List[Dict[str,Any]], owner:str)->str:
amt=tx['amount']/100; op=tx['operationAmount']/100; code=tx['currencyCode']
acc_cur=next((a['currencyCode'] for a in accounts if a['id']==tx.get('account')), code)
sym=currency_symbols.get(code,f'({code})'); sym_acc=currency_symbols.get(acc_cur,f'({acc_cur})')
emoji='👈💳' if(tx.get('mcc')==4829 and amt<0)else mcc_emojis.get(tx.get('mcc'),'💳')
line=f"{emoji} {amt:.2f}{sym}" if code==acc_cur else f"{emoji} {op:.2f}{sym} ({amt:.2f}{sym_acc})"
if tx.get('cashbackAmount'): line+=f", кешбек {tx['cashbackAmount']/100:.2f}{sym}"
desc = tx.get("description", "")
mcc = tx.get("mcc")
mcc_part = f" (MCC: {mcc})" if mcc else ""
balance = tx["balance"] / 100
return (
f"<b>{html.escape(owner)}</b>\n"
f"{html.escape(line)}\n"
f"{html.escape(desc + mcc_part)}\n"
f"<b>Баланс:</b> {balance:.2f}{sym_acc}"
)
async def auth(update:Update):
if update.effective_user.id not in ALLOWED_USERS:
await update.message.reply_text('⛔️'); return False
return True
async def cmd_start(u:Update,c):
if not await auth(u):return
await u.message.reply_text('👋 /status /balance /currency /history [d]')
async def cmd_status(u:Update,c):
if not await auth(u):return
await u.message.reply_text(f'🕒 {datetime.fromtimestamp(load_last()):%Y-%m-%d %H:%M:%S}')
async def cmd_balance(u:Update,c):
if not await auth(u):return
mono=c.bot_data['mono']; d=await mono.client_info()
await u.message.reply_text('💰 Баланс:\n'+ '\n'.join(
f"{a.get('type','Card').capitalize()}: {a['balance']/100:.2f}{currency_symbols.get(a['currencyCode'],a['currencyCode'])}" for a in d['accounts']), parse_mode=ParseMode.HTML)
async def cmd_currency(u:Update,c):
if not await auth(u):return
mono=c.bot_data['mono']; data=await mono.currency()
tgt={840:'USD',978:'EUR',985:'PLN'}; out=[]
for r in data:
if r.get('currencyCodeA') in tgt and r.get('currencyCodeB')==980:
code=tgt[r['currencyCodeA']]; b,s,cr=r.get('rateBuy'),r.get('rateSell'),r.get('rateCross')
out.append(f"{code}: {'🟢 %.2f / 🔴 %.2f'%(b,s) if b and s else '🔄 %.2f'%cr if cr else ''}")
await u.message.reply_text('💱:\n'+'\n'.join(out), parse_mode=ParseMode.HTML)
async def cmd_history(u:Update,c):
if not await auth(u):return
days=int(c.args[0]) if c.args and c.args[0].isdigit() else 1
now=int(time.time()); frm=now-days*86400
mono=c.bot_data['mono']; info=await mono.client_info()
txs=await mono.statement(info['accounts'][0]['id'],frm,now)
if not txs: await u.message.reply_text('Немає транзакцій');return
txs.sort(key=lambda x:x['time'])
for tx in txs:
await u.message.reply_text(fmt(tx,info['accounts'],info['name']),parse_mode=ParseMode.HTML)
await asyncio.sleep(1)
await u.message.reply_text(f'{len(txs)} транзакцій')
async def check(ctx):
last=load_last(); now=int(time.time())
mono:MonoClient=ctx.bot_data['mono']; info=await mono.client_info()
txs=await mono.statement(info['accounts'][0]['id'],last)
if not txs: save_last(now+1);return
txs.sort(key=lambda x:x['time'])
for tx in txs:
if tx['time']<=last:continue
msg=fmt(tx,info['accounts'],info['name'])
for uid in ALLOWED_USERS:
await ctx.bot.send_message(uid,msg,parse_mode=ParseMode.HTML); await asyncio.sleep(1)
save_last(max(t['time'] for t in txs)+1)
async def setup(app):
await app.bot.set_my_commands([
BotCommand('start','Запустити бота'),BotCommand('status','Статус'),
BotCommand('balance','Баланс'),BotCommand('currency','Курс'),
BotCommand('history','Історія')
])
async def run():
async with aiohttp.ClientSession() as sess:
mono=MonoClient(MONO_TOKEN,sess)
req = HTTPXRequest(read_timeout=30.0, write_timeout=30.0, connect_timeout=5.0)
app=(ApplicationBuilder().token(TOKEN).post_init(setup).request(req) .build())
app.bot_data['mono']=mono
app.add_handler(CommandHandler('start',cmd_start))
app.add_handler(CommandHandler('status',cmd_status))
app.add_handler(CommandHandler('balance',cmd_balance))
app.add_handler(CommandHandler('currency',cmd_currency))
app.add_handler(CommandHandler('history',cmd_history))
app.job_queue.run_repeating(check,300,first=5)
await app.initialize(); await app.start(); await app.updater.start_polling()
try: await asyncio.Future()
finally:
await app.updater.stop(); await app.stop(); await app.shutdown()
if __name__=='__main__':
asyncio.run(run())