322 lines
10 KiB
Python
322 lines
10 KiB
Python
import json
|
||
import os
|
||
import random
|
||
import subprocess
|
||
import urllib.request
|
||
import requests
|
||
from pathlib import Path
|
||
|
||
from dotenv import load_dotenv
|
||
from telegram import Update
|
||
from telegram.ext import (
|
||
ApplicationBuilder,
|
||
CommandHandler,
|
||
ContextTypes,
|
||
ConversationHandler,
|
||
MessageHandler,
|
||
filters,
|
||
)
|
||
|
||
PREFIX_WORD = "серГЕЙ"
|
||
JSON_FILE = "sergay.json"
|
||
SERVICE_NAME = "sergay-bot"
|
||
|
||
# Токен для связи с сайтом (тот, что мы прописывали в api.php)
|
||
SITE_API_TOKEN = "super_secret_bot_token_123"
|
||
|
||
WAITING_NEW_PHRASE = 1
|
||
P_TITLE = 2
|
||
P_PHOTOS = 3
|
||
|
||
|
||
def load_phrases(file_path: str) -> list[str]:
|
||
path = Path(file_path)
|
||
|
||
if not path.exists():
|
||
return []
|
||
|
||
with path.open("r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
phrases = []
|
||
|
||
if isinstance(data, list):
|
||
for item in data:
|
||
if isinstance(item, dict):
|
||
item_id = item.get("id")
|
||
if isinstance(item_id, int):
|
||
value = item.get(str(item_id))
|
||
if isinstance(value, str) and value.strip():
|
||
phrases.append(value.strip())
|
||
else:
|
||
for value in item.values():
|
||
if isinstance(value, str) and value.strip():
|
||
phrases.append(value.strip())
|
||
elif isinstance(item, str) and item.strip():
|
||
phrases.append(item.strip())
|
||
|
||
return phrases
|
||
|
||
|
||
def load_raw_data(file_path: str) -> list:
|
||
path = Path(file_path)
|
||
|
||
if not path.exists():
|
||
return []
|
||
|
||
with path.open("r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
if not isinstance(data, list):
|
||
raise ValueError("JSON должен содержать список")
|
||
|
||
return data
|
||
|
||
|
||
def save_new_phrase(file_path: str, phrase: str) -> None:
|
||
phrase = phrase.strip()
|
||
if not phrase:
|
||
raise ValueError("Пустую фразу добавлять нельзя")
|
||
|
||
data = load_raw_data(file_path)
|
||
|
||
existing_phrases = set()
|
||
max_id = 0
|
||
|
||
for item in data:
|
||
if isinstance(item, dict):
|
||
item_id = item.get("id")
|
||
if isinstance(item_id, int):
|
||
max_id = max(max_id, item_id)
|
||
value = item.get(str(item_id))
|
||
if isinstance(value, str):
|
||
existing_phrases.add(value.strip())
|
||
|
||
if phrase in existing_phrases:
|
||
raise ValueError("Такая фраза уже есть в базе")
|
||
|
||
new_id = max_id + 1
|
||
data.append({
|
||
"id": new_id,
|
||
str(new_id): phrase
|
||
})
|
||
|
||
path = Path(file_path)
|
||
with path.open("w", encoding="utf-8") as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
|
||
|
||
def restart_service() -> None:
|
||
subprocess.Popen(
|
||
["systemctl", "restart", SERVICE_NAME],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
start_new_session=True,
|
||
)
|
||
|
||
|
||
def run_gitup() -> tuple[bool, str]:
|
||
result = subprocess.run(
|
||
["bash", "gitup.sh"],
|
||
capture_output=True,
|
||
text=True,
|
||
)
|
||
|
||
output = (result.stdout or "").strip()
|
||
error = (result.stderr or "").strip()
|
||
|
||
if result.returncode == 0:
|
||
return True, output or "gitup выполнен"
|
||
return False, error or output or "Ошибка выполнения gitup"
|
||
|
||
|
||
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
text = (
|
||
"Вот список команд:\n\n"
|
||
"Получить фразу — /phrase\n"
|
||
"Добавить фразу — /new\n"
|
||
"Добавить картинки на сайт — /new_p\n"
|
||
)
|
||
await update.message.reply_text(text)
|
||
|
||
|
||
async def phrase(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
try:
|
||
phrases = load_phrases(JSON_FILE)
|
||
if not phrases:
|
||
text = "Фразы пока не добавлены"
|
||
else:
|
||
phrase_text = random.choice(phrases)
|
||
text = f"{PREFIX_WORD} {phrase_text}".strip()
|
||
except Exception as e:
|
||
text = f"Ошибка: {e}"
|
||
|
||
# Шанс 30% прикрепить картинку
|
||
if random.random() < 0.3 and not text.startswith("Ошибка"):
|
||
try:
|
||
req = urllib.request.Request("https://www.vlv-s.site/api.php?action=random")
|
||
with urllib.request.urlopen(req, timeout=5) as response:
|
||
api_data = json.loads(response.read().decode('utf-8'))
|
||
img_url = api_data.get("url")
|
||
|
||
if img_url:
|
||
await context.bot.send_photo(
|
||
chat_id=update.effective_chat.id,
|
||
photo=img_url,
|
||
caption=text
|
||
)
|
||
return
|
||
except Exception as e:
|
||
print(f"Ошибка загрузки фото из API: {e}")
|
||
|
||
await update.message.reply_text(text)
|
||
|
||
|
||
async def new_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
await update.message.reply_text("Введите фразу:")
|
||
return WAITING_NEW_PHRASE
|
||
|
||
|
||
async def new_save(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
phrase_text = (update.message.text or "").strip()
|
||
|
||
if not phrase_text:
|
||
await update.message.reply_text("Фраза пустая. Введите фразу:")
|
||
return WAITING_NEW_PHRASE
|
||
|
||
try:
|
||
save_new_phrase(JSON_FILE, phrase_text)
|
||
await update.message.reply_text(f"Добавленно {PREFIX_WORD} {phrase_text}")
|
||
restart_service()
|
||
except Exception as e:
|
||
await update.message.reply_text(f"Ошибка: {e}")
|
||
|
||
return ConversationHandler.END
|
||
|
||
|
||
async def new_cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
await update.message.reply_text("Отменено")
|
||
return ConversationHandler.END
|
||
|
||
|
||
# --- ЛОГИКА ДЛЯ /new_p (Загрузка картинок) ---
|
||
async def new_p_start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
context.user_data['new_p_photos'] = []
|
||
await update.message.reply_text("Введите заголовок для новых картинок (он применится ко всем загружаемым сейчас фото):")
|
||
return P_TITLE
|
||
|
||
async def new_p_title(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
context.user_data['new_p_title'] = update.message.text.strip()
|
||
await update.message.reply_text(
|
||
"Принято! Теперь отправляй мне картинки.\n"
|
||
"Можешь отправить одну или выделить сразу несколько в галерее.\n\n"
|
||
"Когда все нужные картинки загрузятся, отправь команду /done"
|
||
)
|
||
return P_PHOTOS
|
||
|
||
async def new_p_photo(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
if update.message.photo:
|
||
# Сохраняем ID самого большого размера картинки
|
||
photo_id = update.message.photo[-1].file_id
|
||
context.user_data['new_p_photos'].append(photo_id)
|
||
return P_PHOTOS
|
||
|
||
async def new_p_done(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
|
||
photos = context.user_data.get('new_p_photos', [])
|
||
title = context.user_data.get('new_p_title', 'Новое фото')
|
||
|
||
if not photos:
|
||
await update.message.reply_text("Ты не отправил ни одной картинки. Операция отменена.")
|
||
return ConversationHandler.END
|
||
|
||
await update.message.reply_text(f"⏳ Скачиваю и отправляю {len(photos)} фото на сервер. Это может занять пару минут...")
|
||
|
||
try:
|
||
files_payload = []
|
||
for i, pid in enumerate(photos):
|
||
new_file = await context.bot.get_file(pid)
|
||
p_bytes = await new_file.download_as_bytearray()
|
||
# Формируем пакет файлов так же, как отправляет браузер (массив photos[])
|
||
files_payload.append(('photos[]', (f'img_tg_{i}.jpg', p_bytes, 'image/jpeg')))
|
||
|
||
data_payload = {
|
||
'title': title,
|
||
'description': '',
|
||
'api_token': SITE_API_TOKEN
|
||
}
|
||
|
||
resp = requests.post("https://www.vlv-s.site/api.php?action=upload", files=files_payload, data=data_payload)
|
||
resp_json = resp.json()
|
||
|
||
if resp_json.get('success'):
|
||
await update.message.reply_text("✅ Успех! Картинки добавлены в галерею на сайт.")
|
||
else:
|
||
await update.message.reply_text(f"❌ Сайт вернул ошибку: {resp_json}")
|
||
|
||
except Exception as e:
|
||
await update.message.reply_text(f"❌ Ошибка соединения или загрузки: {e}")
|
||
|
||
# Очищаем данные
|
||
context.user_data.clear()
|
||
return ConversationHandler.END
|
||
|
||
|
||
async def gitup(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||
await update.message.reply_text("Запускаю gitup...")
|
||
|
||
ok, output = run_gitup()
|
||
|
||
if len(output) > 3500:
|
||
output = output[:3500] + "\n\n...вывод обрезан"
|
||
|
||
if ok:
|
||
await update.message.reply_text(f"gitup выполнен\n\n{output}")
|
||
else:
|
||
await update.message.reply_text(f"Ошибка gitup\n\n{output}")
|
||
|
||
|
||
def main() -> None:
|
||
load_dotenv()
|
||
token = os.getenv("TG_TOKEN")
|
||
|
||
if not token:
|
||
raise ValueError("Не найден TG_TOKEN в .env")
|
||
|
||
app = ApplicationBuilder().token(token).build()
|
||
|
||
new_handler = ConversationHandler(
|
||
entry_points=[CommandHandler("new", new_start)],
|
||
states={
|
||
WAITING_NEW_PHRASE: [
|
||
MessageHandler(filters.TEXT & ~filters.COMMAND, new_save)
|
||
],
|
||
},
|
||
fallbacks=[CommandHandler("cancel", new_cancel)],
|
||
)
|
||
|
||
new_p_handler = ConversationHandler(
|
||
entry_points=[CommandHandler("new_p", new_p_start)],
|
||
states={
|
||
P_TITLE: [
|
||
MessageHandler(filters.TEXT & ~filters.COMMAND, new_p_title)
|
||
],
|
||
P_PHOTOS: [
|
||
MessageHandler(filters.PHOTO, new_p_photo),
|
||
CommandHandler("done", new_p_done)
|
||
]
|
||
},
|
||
fallbacks=[CommandHandler("cancel", new_cancel)],
|
||
)
|
||
|
||
app.add_handler(CommandHandler("start", start))
|
||
app.add_handler(CommandHandler("phrase", phrase))
|
||
app.add_handler(CommandHandler("gitup", gitup))
|
||
app.add_handler(new_handler)
|
||
app.add_handler(new_p_handler)
|
||
|
||
app.run_polling()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|