sergay/code/sergay.py

219 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import random
import subprocess
from pathlib import Path
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
ConversationHandler,
MessageHandler,
filters,
)
PREFIX_WORD = "Сергей"
JSON_FILE = "sergay.json"
SERVICE_NAME = "sergay-bot"
WAITING_NEW_PHRASE = 1
def load_phrases(file_path: str) -> list[str]:
path = Path(file_path)
if not path.exists():
return []
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
phrases = []
if isinstance(data, list):
for item in data:
if isinstance(item, dict):
item_id = item.get("id")
if isinstance(item_id, int):
value = item.get(str(item_id))
if isinstance(value, str) and value.strip():
phrases.append(value.strip())
else:
for value in item.values():
if isinstance(value, str) and value.strip():
phrases.append(value.strip())
elif isinstance(item, str) and item.strip():
phrases.append(item.strip())
return phrases
def load_raw_data(file_path: str) -> list:
path = Path(file_path)
if not path.exists():
return []
with path.open("r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError("JSON должен содержать список")
return data
def save_new_phrase(file_path: str, phrase: str) -> None:
phrase = phrase.strip()
if not phrase:
raise ValueError("Пустую фразу добавлять нельзя")
data = load_raw_data(file_path)
existing_phrases = set()
max_id = 0
for item in data:
if isinstance(item, dict):
item_id = item.get("id")
if isinstance(item_id, int):
max_id = max(max_id, item_id)
value = item.get(str(item_id))
if isinstance(value, str):
existing_phrases.add(value.strip())
if phrase in existing_phrases:
raise ValueError("Такая фраза уже есть в базе")
new_id = max_id + 1
data.append({
"id": new_id,
str(new_id): phrase
})
path = Path(file_path)
with path.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
def restart_service() -> None:
subprocess.Popen(
f"bash /root/sergay/code/gitup.sh && systemctl restart {SERVICE_NAME}",
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
def run_gitup() -> tuple[bool, str]:
result = subprocess.run(
["bash", "gitup.sh"],
capture_output=True,
text=True,
)
output = (result.stdout or "").strip()
error = (result.stderr or "").strip()
if result.returncode == 0:
return True, output or "gitup выполнен"
return False, error or output or "Ошибка выполнения gitup"
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
text = (
"Вот список команд:\n\n"
"Получить фразу — /phrase\n"
"Добавить фразу — /new\n"
)
await update.message.reply_text(text)
async def phrase(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
phrases = load_phrases(JSON_FILE)
if not phrases:
text = "Фразы пока не добавлены"
else:
phrase_text = random.choice(phrases)
text = f"{PREFIX_WORD} {phrase_text}".strip()
except Exception as e:
text = f"Ошибка: {e}"
await update.message.reply_text(text)
async def new_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Введите фразу:")
return WAITING_NEW_PHRASE
async def new_save(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
phrase_text = (update.message.text or "").strip()
if not phrase_text:
await update.message.reply_text("Фраза пустая. Введите фразу:")
return WAITING_NEW_PHRASE
try:
save_new_phrase(JSON_FILE, phrase_text)
await update.message.reply_text(f"Добавленно {PREFIX_WORD} {phrase_text}")
restart_service()
except Exception as e:
await update.message.reply_text(f"Ошибка: {e}")
return ConversationHandler.END
async def new_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Отменено")
return ConversationHandler.END
async def gitup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Запускаю gitup...")
ok, output = run_gitup()
if len(output) > 3500:
output = output[:3500] + "\n\n...вывод обрезан"
if ok:
await update.message.reply_text(f"gitup выполнен\n\n{output}")
else:
await update.message.reply_text(f"Ошибка gitup\n\n{output}")
def main() -> None:
load_dotenv()
token = os.getenv("TG_TOKEN")
if not token:
raise ValueError("Не найден TG_TOKEN в .env")
app = ApplicationBuilder().token(token).build()
new_handler = ConversationHandler(
entry_points=[CommandHandler("new", new_start)],
states={
WAITING_NEW_PHRASE: [
MessageHandler(filters.TEXT & ~filters.COMMAND, new_save)
],
},
fallbacks=[CommandHandler("cancel", new_cancel)],
)
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("phrase", phrase))
app.add_handler(CommandHandler("gitup", gitup))
app.add_handler(new_handler)
app.run_polling()
if __name__ == "__main__":
main()