Удалено: весь код на Python (максимально по-зверски)

This commit is contained in:
far-galaxy 2023-03-15 16:40:09 +04:00
parent 7934d3b4ba
commit 7d3ac7ee6a
13 changed files with 0 additions and 1200 deletions

270
bot.py
View File

@ -1,270 +0,0 @@
from database.l9 import L9_DB
from database.tg import TG_DB
from database.shedule import Shedule_DB
from utils.config import *
from utils.stuff import *
import telegram
from tg.keyboards import Keyboard
import logging
from logging.handlers import TimedRotatingFileHandler as TRFL
import configparser
import datetime
logger = logging.getLogger('bot')
def initLogger():
if not os.path.isdir(f'logs/bot'):
os.makedirs(f'logs/bot')
f_handler = TRFL(f'./logs/bot/log', when='midnight', encoding="utf-8")
f_format = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
datefmt='%d-%b-%y %H:%M:%S',
)
f_handler.setFormatter(f_format)
f_handler.setLevel(logging.INFO)
logger.addHandler(f_handler)
c_handler = logging.StreamHandler()
c_format = logging.Formatter('%(levelname)s : %(message)s')
c_handler.setFormatter(c_format)
logger.addHandler(c_handler)
logger.setLevel(logging.DEBUG)
class Bot:
def __init__(
self,
token: str,
db: L9_DB,
tg_db: TG_DB,
shedule: Shedule_DB,
limit=150,
):
self.l9lk = db
self.tg_db = tg_db
self.shedule = shedule
self.tg = telegram.Bot(token)
self.limit = limit
self.udpate_id = None
self.isWork = True
def answer(self, query: telegram.CallbackQuery, text=None, alert=False):
try:
query.answer(text, alert)
except telegram.error.BadRequest:
pass
def edit(self, query: telegram.CallbackQuery, text=None, markup=None):
if isinstance(text, str):
try:
query.edit_message_text(text)
except telegram.error.BadRequest:
pass
if isinstance(markup, telegram.ReplyKeyboardMarkup):
try:
query.edit_message_reply_markup(text)
except telegram.error.BadRequest:
pass
def checkMessages(self):
"""Проверка и обработка входящих сообщений"""
updates = self.tg.get_updates(offset=self.udpate_id, timeout=5)
for update in updates:
self.udpate_id = update.update_id + 1
if update.callback_query:
query = update.callback_query
tag, l9Id, log = self.tg_db.getTagC(query)
logger.info(log)
tgId = query.from_user.id
if 'conf' in tag:
self.confirmGroup(query, tag, l9Id)
if update.message:
query = update.message
tag, l9Id, log = self.tg_db.getTagM(query)
logger.info(log)
tgId = query.from_user.id
if tag == 'not_started':
self.start(query)
elif query.text == 'Отмена':
if self.shedule.getGroups(l9Id) != None:
self.tg_db.changeTag(tgId, 'ready')
self.tg.sendMessage(
tgId,
loc['etc']['cancel'],
reply_markup=Keyboard.menu(),
)
else:
self.tg_db.changeTag(tgId, 'add')
self.tg.sendMessage(
tgId,
loc['etc']['need_group'],
reply_markup=Keyboard.menu(),
)
elif tag == 'add':
self.addGroup(l9Id, query)
elif query.text == 'Главное меню':
now = query.date
# now = datetime.datetime(2023, 2, 6, 14, 0)
pairs = self.shedule.nearLesson(now, l9Id)
if pairs != None:
pair = pairs[0][0][1]
if pair.date() > now.date():
text = loc['shedule']['next_days']
day = datetime.timedelta(days=1)
if pair.date() - now.date() == day:
text += f" {loc['shedule']['tomorrow']}:\n"
else:
text += (
f' {pair.day} {month[pair.month-1]}:\n'
)
elif pair.time() > now.time():
text = f"{loc['shedule']['today']}:\n"
else:
text = f"{loc['shedule']['now']}:\n"
text += self.shedule.strLesson(
[p[0] for p in pairs[0]]
)
if len(pairs) == 2 and pair.date() == now.date():
text += f"\n{loc['shedule']['next']}:\n"
text += self.shedule.strLesson(
[p[0] for p in pairs[1]]
)
elif pair.date() == now.date():
text += f"\n{loc['shedule']['next_empty']}"
# TODO: Добавить смайликов
self.tg.sendMessage(
tgId,
text,
reply_markup=Keyboard.menu(),
)
else:
self.tg.sendMessage(
tgId,
loc['shedule']['no_shedule'],
reply_markup=Keyboard.menu(),
)
else:
self.tg.sendMessage(
tgId,
loc['etc']['oops'],
reply_markup=Keyboard.menu(),
)
def start(self, query: telegram.Message):
"""Обработка нового пользователя"""
# Проверка лимита пользователей и обработка лишних
count = self.l9lk.countUsers()
tgId = query.from_user.id
if count >= self.limit:
self.tg.sendMessage(
tgId,
loc['etc']['overlimit'],
)
else:
self.tg_db.changeTag(tgId, 'add')
self.tg.sendMessage(
tgId,
loc['etc']['hello'],
)
def addGroup(self, l9Id: int, query: telegram.Message):
"""Процесс добавления группы"""
groupName = query.text
tgId = query.from_user.id
result = self.shedule.checkGroupExists(groupName, l9Id)
if 'OK' in result:
_, groupName, specName = result.split(';')
self.tg_db.changeTag(tgId, 'ready')
self.tg.sendMessage(
tgId,
loc['group']['connected'] % (groupName, specName),
reply_markup=Keyboard.menu(),
)
elif result == 'Exists':
self.tg.sendMessage(
tgId,
loc['group']['exists'],
reply_markup=Keyboard.cancel(),
)
elif result == 'Error':
self.tg.sendMessage(
tgId,
loc['group']['error'],
reply_markup=Keyboard.cancel(),
)
elif 'ssau.ru' in result:
self.tg_db.changeTag(tgId, f'conf_{result[21:]}')
self.tg.sendMessage(
tgId,
loc['group']['checkShedule'] % (result),
reply_markup=Keyboard.confirm(),
)
else:
self.tg.sendMessage(
tgId,
loc['group']['empty'],
reply_markup=Keyboard.cancel(),
)
def confirmGroup(
self, query: telegram.CallbackQuery, tag: str, l9Id: str
):
"""Процесс подтверждения группы и загрузка расписания"""
tgId = query.from_user.id
self.answer(query)
if query.data == 'yes':
self.edit(query, loc['group']['loading'])
self.shedule.loadShedule(tag[5:], query.message.date)
self.shedule.db.insert(
'groups_users',
{'l9Id': l9Id, 'groupId': tag[5:]},
)
self.edit(query, loc['group']['loaded'], Keyboard.menu())
self.tg_db.changeTag(tgId, 'ready')
else:
self.edit(query, loc['group']['nogroup'], Keyboard.cancel())
self.tg_db.changeTag(tgId, 'add')
if __name__ == "__main__":
initLogger()
logger.info("Start bot")
loc = configparser.ConfigParser()
loc.read('./locale/ru.ini', encoding='utf-8')
config = loadJSON("config")
l9lk = L9_DB(**config['sql'])
tg_db = TG_DB(l9lk)
shedule = Shedule_DB(l9lk, config['first_week'])
bot = Bot(
config['tg']['token'], l9lk, tg_db, shedule, config['tg']['limit']
)
logger.info("Bot ready!")
while bot.isWork:
msgs = bot.checkMessages()

View File

@ -1,207 +0,0 @@
from mysql.connector import connect
from mysql.connector.cursor_cext import CMySQLCursor
import random
class Database:
"""Модуль для mysql-connector"""
def __init__(self, host: str, user: str, password: str):
"""Подключение к серверу MySQL"""
self.database = connect(host=host, user=user, password=password)
self.cursor = self.database.cursor()
def execute(self, query: str, commit=False) -> CMySQLCursor:
"""Выполнить SQL запрос
Примечание: в целях безопасности функция игнорирует запросы DROP и TRUNCATE
Args:
:query: текст запроса
:commit: [optional] сохранить изменения
Returns:
:cursor: объект курсора
"""
if (
query.lower().find("drop") == -1
and query.lower().find("truncate") == -1
):
print(query)
self.cursor.execute(query)
if commit:
self.database.commit()
return self.cursor
def executeFile(self, filename: str, commit=False) -> CMySQLCursor:
"""Выполнить запрос из .sql файла
Args:
:filename: название файла (без расширения)
:commit: [optional] сохранить изменения
Returns:
:cursor: объект курсора
"""
with open(f'database/{filename}.sql', encoding='utf-8') as f:
query = f.read().split('\n\n\n')
return [self.execute(i, commit) for i in query]
def initDatabase(self, name: str):
"""Создать базу данных, если таковая отсутствует,
и переключиться на неё для использования в дальнейших запросах
Args:
:name: название базы данных
"""
self.execute(f"CREATE DATABASE IF NOT EXISTS {name};")
self.execute(f"USE {name};")
def initTable(self, name: str, head: str):
"""Создать таблицу, если таковая отсутствует
TODO: вырезать эту функцию, поскольку теперь БД инициализирутся
из файла
Args:
:name: название таблицы
:head: двумерный список, в строках которых описаны столбцы таблицы
"""
query = f"CREATE TABLE IF NOT EXISTS `{name}` ("
query += ", ".join([" ".join(i) for i in head])
query += ");"
self.execute(query)
def insert(self, name: str, values: dict):
"""Вставить значение в таблицу
Args:
:name: название таблицы
:values: словарь их названий столбцов и их значений
"""
query = f"INSERT IGNORE INTO `{name}` ("
query += ", ".join(values) + ") VALUES ("
query += (
", ".join(
[
f'"{i}"' if (i != None) else "NULL"
for i in values.values()
]
)
+ ");"
)
self.execute(query, commit=True)
def get(self, name: str, condition=None, columns=None) -> list:
"""Получить данные из таблицу по запросу вида:
:SELECT columns FROM name WHERE condition:
Args:
:name: название таблицы
:condition: SQL условие для выборки, для получения всех строк оставить None
:columns: [optional] список столбцов, которые необходимо выдать, для всех столбцов оставить None
"""
query = "SELECT " + (', '.join(columns) if columns != None else "*")
query += f" FROM `{name}`"
query += f" WHERE {condition};" if condition != None else ";"
result = self.execute(query).fetchall()
return result
def update(self, name: str, condition: str, new: str):
"""Обновить данные в строке
Args:
:name: название таблицы
:condition: SQL условие для выборки строки
:new: SQL условия для замены значений столбцов
"""
query = f"UPDATE {name}"
query += f" SET {new} WHERE {condition};"
self.execute(query, commit=True)
def newID(self, name: str, id_name: str) -> str:
"""Сгенерировать уникальный ID из 9 цифр
Args:
:name: название таблицы пользователей
:id_name: название столбца уникальных ID
Returns:
:someID: строка с уникальным ID
"""
someID = random.randint(100000000, 999999999)
result = self.get(name, f"{id_name} = {someID}")
exist = result != []
if not exist:
return str(someID)
else:
self.newID()
def checkTables(self, file: str):
"""Проверка текущей структуры таблиц с файлом"""
actual_tables = {}
with open(f'{file}.sql', encoding='utf-8') as actual:
table = []
last_table = None
for line in actual:
if 'CREATE TABLE' in line:
if last_table != None:
actual_tables[last_table] = table
table = []
last_table = line.split()[-2].replace('`', '')
elif not '--' in line and ');' not in line:
string = line.replace('\n', '').replace(
'NOT NULL', 'N_N'
)
string = string.split('\t')
string = [i for i in string if i != '']
if string != []:
table.append(string)
# Не теряем последнюю таблицу
actual_tables[last_table] = table
old_tables = self.execute(f'SHOW TABLES').fetchall()
old_tables = [i[0] for i in old_tables]
for act_table in actual_tables:
if act_table not in old_tables:
lines = "\n".join(
[" ".join(i) for i in actual_tables[act_table]]
)
self.execute(f'CREATE TABLE `{act_table}` ({lines})')
"""
else:
dump = self.execute(
f'SHOW CREATE TABLE `{act_table}`'
).fetchall()
dump = dump[0][1].replace('NOT NULL', 'N_N')
dump = dump.split('\n')
old_rows = [
i.split()
for i in dump
if (
('PRIMARY KEY' not in i)
and ('CONSTRAINT' not in i)
and ('CREATE' not in i)
and ('ENGINE' not in i)
)
]
# Корректируем имеющиеся столбцы
for act_row in actual_tables[act_table]:
rows = [i[0] for i in old_rows]
if (
act_row[0] in rows
and act_row != old_rows[rows.index(act_row[0])]
):
line = (
" ".join(act_row)
.replace('N_N', 'NOT NULL')
.replace(',', '')
)
self.execute(
f'ALTER TABLE `{act_table}` MODIFY COLUMN {line}'
)
"""

View File

@ -1,198 +0,0 @@
import requests
from bs4 import BeautifulSoup
from ast import literal_eval
import time
import logging
import datetime
from itertools import groupby
logger = logging.getLogger('bot')
def findInRasp(req: str):
"""Поиск группы (преподавателя) в расписании"""
logger.debug(f'Find {req}')
rasp = requests.Session()
rasp.headers['User-Agent'] = 'Mozilla/5.0'
hed = rasp.get("https://ssau.ru/rasp/")
if hed.status_code == 200:
soup = BeautifulSoup(hed.text, 'lxml')
csrf_token = soup.select_one('meta[name="csrf-token"]')['content']
else:
return 'Error'
time.sleep(1)
rasp.headers['Accept'] = 'application/json'
rasp.headers['X-CSRF-TOKEN'] = csrf_token
result = rasp.post("https://ssau.ru/rasp/search", data={'text': req})
if result.status_code == 200:
num = literal_eval(result.text)
else:
return 'Error'
if len(num) == 0:
return None
else:
return num[0]
def connect(groupId: str, week: int, reconnects=0) -> BeautifulSoup:
"""Подключение к сайту с расписанием"""
logger.debug(
f'Connecting to sasau, groupId = {groupId}, week N {week}, attempt {reconnects}'
)
rasp = requests.Session()
rasp.headers['User-Agent'] = 'Mozilla/5.0'
site = rasp.get(
f'https://ssau.ru/rasp?groupId={groupId}&selectedWeek={week}'
)
if site.status_code == 200:
contents = site.text.replace("\n", " ")
soup = BeautifulSoup(contents, 'html.parser')
return soup
elif reconnects < 5:
time.sleep(2)
return connect(groupId, week, reconnects + 1)
else:
raise 'Connection to sasau failed!'
def getGroupInfo(groupId: str) -> dict:
"""Получение информации о группе (ID, полный номер, название направления)"""
logger.debug(f'Getting group {groupId} information')
soup = connect(groupId, 1)
group_spec_soup = soup.find(
"div", {"class": "body-text info-block__description"}
)
group_spec = group_spec_soup.find("div").contents[0].text[1:]
group_name_soup = soup.find("h2", {"class": "h2-text info-block__title"})
group_name = group_name_soup.text[1:5]
info = {
'groupId': groupId,
'groupName': group_name,
'specName': group_spec,
}
return info
lesson_types = ('lect', 'lab', 'pract', 'other')
teacher_columns = ('surname', 'name', 'midname', 'teacherId')
def parseWeek(groupId: str, week: int, teachers=[]):
soup = connect(groupId, week)
dates_soup = soup.find_all("div", {"class": "schedule__head-date"})
dates = []
for date in dates_soup:
date = datetime.datetime.strptime(
date.contents[0].text, ' %d.%m.%Y'
).date()
dates.append(date)
blocks = soup.find("div", {"class": "schedule__items"})
blocks = [
item
for item in blocks
if "schedule__head" not in item.attrs["class"]
]
numInDay = 0
weekday = 0
times = []
shedule = []
week = []
for block in blocks:
if block.attrs['class'] == ['schedule__time']:
begin = datetime.datetime.strptime(
block.contents[0].text, ' %H:%M '
).time()
end = datetime.datetime.strptime(
block.contents[1].text, ' %H:%M '
).time()
times.append((begin, end))
numInDay += 1
weekday = 0
if numInDay != 1:
week = []
else:
begin_dt = datetime.datetime.combine(dates[weekday], begin)
end_dt = datetime.datetime.combine(dates[weekday], end)
sub_pairs = block.find_all("div", {"class": "schedule__lesson"})
pair = []
for sub_pair in sub_pairs:
if sub_pair != []:
name = sub_pair.select_one('div.schedule__discipline')
lesson_type = lesson_types[
int(name['class'][-1][-1]) - 1
]
name = name.text
place = sub_pair.select_one('div.schedule__place').text
place = place if "on" not in place.lower() else "ONLINE"
place = place if place != "" else None
teacher = sub_pair.select_one('.schedule__teacher a')
teacherId = (
teacher['href'][14:] if teacher != None else None
)
if teacher != None:
if teacherId not in [
str(i['teacherId']) for i in teachers
]:
teacher_name = teacher.text[:-4]
t_info = findInRasp(teacher_name)['text'].split()
t_info.append(teacherId)
teachers.append(
dict(zip(teacher_columns, t_info))
)
groups = sub_pair.select_one('div.schedule__groups').text
groups = "\n" + groups if 'групп' in groups else ""
comment = sub_pair.select_one(
'div.schedule__comment'
).text
comment = comment if comment != "" else None
full_name = f'{name}{groups}'
lesson = {
'numInDay': numInDay,
'numInShedule': numInDay,
'type': lesson_type,
'name': full_name,
'groupId': groupId,
'begin': begin_dt,
'end': end_dt,
'teacherId': teacherId,
'place': place,
'addInfo': comment,
}
shedule.append(lesson)
weekday += 1
shedule = sorted(shedule, key=lambda d: d['begin'])
new_shedule = []
# Correct numInDay
for date, day in groupby(shedule, key=lambda d: d['begin'].date()):
day = list(day)
first_num = day[0]['numInDay'] - 1
for l in day:
l['numInDay'] -= first_num
new_shedule.append(l)
return new_shedule, teachers

View File

@ -1,26 +0,0 @@
from .a_sql import Database
class L9_DB:
"""Класс взаимодействия с базой пользователей L9
(перспектива для сайта)
"""
def __init__(self, user, password):
self.db = Database('localhost', user, password)
self.db.initDatabase('l9_db')
self.db.executeFile('l9')
def countUsers(self) -> int:
return len(self.db.get('users', None, ['l9Id']))
def initUser(self, uid):
result = self.db.get('users', f"l9Id = {uid}", ["l9Id"])
if result == []:
l9Id = self.db.newID('users', "l9Id")
user = {"l9Id": l9Id}
self.db.insert('users', user)
else:
l9Id = result[0][0]
return l9Id

View File

@ -1,7 +0,0 @@
-- Пользователи системы
CREATE TABLE IF NOT EXISTS `users` (
`l9Id` bigint NOT NULL,
-- Идентификатор пользователя системы
PRIMARY KEY (`l9Id`)
);

View File

@ -1,197 +0,0 @@
from .l9 import L9_DB
from .a_ssau_parser import *
import telegram
from configparser import ConfigParser
import datetime
from itertools import groupby
class Shedule_DB:
"""Класс взаимодействия с базой расписания"""
def __init__(self, l9lk: L9_DB, first_week):
self.l9lk = l9lk
self.db = l9lk.db
self.first_week = first_week
self.db.executeFile('shedule')
def checkGroupExists(self, groupName: str, l9Id: str) -> str:
"""Проверка наличия группы в БД и на сайте, а также проверка,
что пользователь ещё не подключен к группе
'OK;N_группы;Назв-е_спец-сти' - пользователь успешно подключен \n
'Exists' - пользователь уже подключен к данной группе \n
'ssau.ru/groupId=*' - группа отсутствует в базе, но есть на сайте \n
'Error' - ошибка на стороне сайта
'Empty' - группа нигде не обнаружена
"""
groupIdInDB = self.l9lk.db.get(
'groups',
f'groupName LIKE "{groupName}%"',
['groupId', 'groupName', 'specName'],
)
if groupIdInDB != []:
groupIdInDB = groupIdInDB[0]
exists = self.l9lk.db.get(
'groups_users',
f'l9Id = {l9Id} AND groupId = {groupIdInDB[0]}',
)
if exists == []:
self.l9lk.db.insert(
'groups_users',
{'l9Id': l9Id, 'groupId': groupIdInDB[0]},
)
return f'OK;{groupIdInDB[1]};{groupIdInDB[2]}'
else:
return 'Exists'
else:
group = findInRasp(groupName)
if group != None:
group_url = f'ssau.ru/{group["url"][2:]}'
gr_num = group["text"]
groupId = group["id"]
return group_url
elif group == 'Error':
return 'Error'
else:
return 'Empty'
def loadShedule(self, groupId: str, date: datetime.datetime):
"""Загрузка расписания"""
week = date.isocalendar()[1] - self.first_week
self.db.execute(
f'DELETE FROM `lessons` WHERE WEEK(`begin`, 1) = {date.isocalendar()[1]} AND groupId = {groupId};'
)
t_info = self.db.get('teachers', None, teacher_columns)
t_info = [dict(zip(teacher_columns, i)) for i in t_info]
lessons, teachers = parseWeek(groupId, week, t_info)
g = getGroupInfo(groupId)
self.db.insert('groups', g)
for t in teachers:
self.l9lk.db.insert('teachers', t)
for l in lessons:
self.l9lk.db.insert('lessons', l)
def getGroups(self, l9Id: str):
groups = self.db.execute(
(
f'SELECT g.groupId, groupName FROM '
f'`groups_users` AS gu JOIN `groups` AS g '
'ON gu.groupId=g.groupId WHERE '
f'l9Id = {l9Id}'
)
).fetchall()
return groups if groups != [] else None
def getLesson(self, lessonId):
icons = {'other': '📙', 'lect': '📗', 'lab': '📘', 'pract': '📕'}
lesson = self.db.get('lessons', f'lessonId = {lessonId}')
if lesson != []:
lesson = lesson[0]
teacher = None
if lesson[12] != None:
teacher = self.db.get(
'teachers', f'teacherId = {lesson[12]}'
)
if teacher != None and teacher != []:
info = teacher[0]
teacher = f"{info[1]} {info[2][0]}.{info[3][0]}."
json_lesson = {
'numInDay': lesson[5],
'type': icons[lesson[7]],
'name': lesson[8],
'place': lesson[13],
'teacher': teacher,
'add_info': lesson[14],
'begin': lesson[10],
'end': lesson[11],
}
return json_lesson
else:
return {'empty'}
def strLesson(self, lessonIds):
lesson = [self.getLesson(i) for i in lessonIds]
begin = lesson[0]['begin']
end = lesson[0]['end']
text = "\n📆 %02i:%02i - %02i:%02i" % (
begin.hour,
begin.minute,
end.hour,
end.minute,
)
for l in lesson:
add_info = "" if l['add_info'] == None else "\n" + l['add_info']
teacher = "" if l['teacher'] == None else "\n👤 " + l['teacher']
place = "" if l['place'] == None else f"\n🧭 {l['place']}"
text += f"\n{l['type']} {l['name']}{place}{teacher}{add_info}\n"
return text
def nearLesson(self, date: datetime.datetime, l9Id: str, retry=False):
str_time = date.isoformat(sep=' ')
groupIds = self.getGroups(l9Id)
if groupIds != None:
second_gr = (
f' OR groupId = {groupIds[1][0]}'
if len(groupIds) == 2
else ''
)
lessonId = self.db.get(
'lessons',
f"(groupId = {groupIds[0][0]}{second_gr}) AND `end` > '{str_time}' "
'ORDER BY `begin` LIMIT 4',
['lessonId', 'begin'],
)
if lessonId != []:
if len(lessonId) >= 2:
pairs = [
list(pair)
for begin, pair in groupby(
lessonId, key=lambda d: d[1]
)
]
if (
len(pairs) >= 2
and pairs[0][0][1].date() != pairs[1][0][1].date()
):
pairs = [pairs[0]]
elif (
len(pairs) > 2
and pairs[0][0][1].date() == pairs[1][0][1].date()
):
pairs = pairs[:2]
else:
pairs = [lessonId[0]]
return pairs
elif not retry:
for groupId in [i for i in groupIds if i[0] > 1000]:
self.loadShedule(
groupId[0], date + datetime.timedelta(days=7)
)
return self.nearLesson(date, l9Id, retry=True)

View File

@ -1,125 +0,0 @@
-- Учебные группы
CREATE TABLE IF NOT EXISTS `groups` (
`groupId` bigint NOT NULL,
-- Идентификатор группы, соответствует ID в расписании на сайте
`groupName` varchar(4) DEFAULT '0000',
-- Учебный номер (наименование) группы
`specName` text,
-- Код и название направления подготовки
PRIMARY KEY (`groupId`)
);
-- Сведения о группах пользователя и настройках каждой группы
CREATE TABLE IF NOT EXISTS `groups_users` (
`guId` bigint NOT NULL AUTO_INCREMENT,
-- (service) Идентификатор сведения, устанавливается автоматически
`l9Id` bigint NOT NULL,
-- Идентификатор пользователя системы
`groupId` bigint NOT NULL,
-- ID группы, которой принадлежит пользователь
`firstTime` int DEFAULT '45',
-- Время в минутах, за которое приходит уведомление о начале занятий
`firstNote` tinyint DEFAULT '1',
-- Состояние уведомлений о начале занятий:
-- 0 - выключены
-- ненулевое значение - включены
`nextNote` tinyint DEFAULT '1',
-- Состояние уведомлений о первой или следующей паре
-- 0 - выключены
-- ненулевое значение - включены
PRIMARY KEY (`guId`),
KEY `guid_idx` (`l9Id`),
KEY `gid_idx` (`groupId`),
CONSTRAINT `gr_gu` FOREIGN KEY (`groupId`) REFERENCES `groups` (`groupId`) ON DELETE CASCADE ON UPDATE CASCADE,
CONSTRAINT `l9_gu` FOREIGN KEY (`l9Id`) REFERENCES `users` (`l9Id`) ON DELETE CASCADE ON UPDATE CASCADE
);
-- Преподаватели
CREATE TABLE IF NOT EXISTS `teachers` (
`teacherId` bigint NOT NULL,
-- Идентификатор преподавателя, соответствует ID на сайте
`surname` varchar(45) DEFAULT 'Brzęczyszczykiewicz',
`name` varchar(45) DEFAULT 'Grzegorz',
`midname` varchar(45) DEFAULT 'Chrząszczyżewoszywicz',
-- ФИО преподавателя
PRIMARY KEY (`teacherId`)
);
-- Занятия
CREATE TABLE IF NOT EXISTS `lessons` (
`lessonId` bigint NOT NULL AUTO_INCREMENT,
-- (service) Идентификатор занятия, устанавливается автоматически
`addedBy` varchar(4) DEFAULT 'ssau',
-- Источник информации о занятии:
-- 'ssau' - сайт Университета
-- 'lk' - Личный кабинет сайта Университета
-- '`l9Id`' - добавлено пользователем
`cancelled` bigint DEFAULT '0',
-- Отметка, является ли занятие отменённым
-- '0' - занятие НЕ отменено
-- '`l9Id`' - занятие отменено пользователем
`migratedTo` bigint DEFAULT '0',
-- Отметка, является ли занятие перенесённым КУДА-ТО
-- '0' - занятие НЕ перенесено
-- '`lessonId`' - занятие перенесено на другое время
`migratedFrom` bigint DEFAULT '0',
-- Отметка, является ли занятие перенесённым ОТКУДА-ТО
-- '0' - занятие НЕ перенесено
-- '`lessonId`' - занятие перенесено c другой пары
`numInDay` int DEFAULT '1',
-- Порядковый номер занятия в текущем дне
`numInShedule` int DEFAULT '1',
-- Порядковый номер занятия относительно расписания на неделю
`type` char(5) DEFAULT 'other',
-- Тип занятия:
-- 'lect' - лекция
-- 'pract' - практика (семинар)
-- 'lab' - лабораторная работа
-- 'other' - прочие
`name` text,
-- Название занятия
`groupId` bigint NOT NULL,
-- ID учебной группы
`begin` datetime NOT NULL,
`end` datetime NOT NULL,
-- Начало и конец занятия
`teacherId` bigint DEFAULT NULL,
-- (опционально) ID преподавателя
`place` text,
-- (опционально) Учебная аудитория
`addInfo` text,
-- (опционально) Дополнительная информация
PRIMARY KEY (`lessonId`),
KEY `gr_l_idx` (`groupId`),
KEY `t_l_idx` (`teacherId`),
CONSTRAINT `group_l` FOREIGN KEY (`groupId`) REFERENCES `groups` (`groupId`) ON DELETE RESTRICT ON UPDATE RESTRICT,
CONSTRAINT `teach_l` FOREIGN KEY (`teacherId`) REFERENCES `teachers` (`teacherId`) ON DELETE SET NULL ON UPDATE CASCADE
);

View File

@ -1,51 +0,0 @@
from .l9 import L9_DB
import telegram
class TG_DB:
"""Класс взаимодействия с БД пользователей бота в Telegram"""
def __init__(self, l9lk: L9_DB):
self.l9lk = l9lk
self.db = l9lk.db
self.db.executeFile('tg')
def getTagM(self, query: telegram.Message) -> (str, str, str):
"""Получить тэг и l9Id пользователя из сообщения"""
tgId = query.from_user.id
name = f'{query.from_user.first_name or ""} {query.from_user.last_name or ""}'
l9Id = self.db.get('tg_users', f"tgId = {tgId}", ["l9Id"])
if l9Id == []:
l9Id = self.l9lk.initUser(0)
user = {"l9Id": l9Id, "tgId": tgId, "name": name}
self.db.insert('tg_users', user)
else:
l9Id = l9Id[0][0]
tag = self.db.get('tg_users', f"tgId = {tgId}", ["posTag"])[0][0]
return tag, l9Id, f'{tgId}\t{tag}\t{name}\tM:{query.text}'
def getTagC(self, query: telegram.CallbackQuery) -> (str, str, str):
"""Получить тэг и l9Id пользователя из CallbackQuery (кнопки)"""
tgId = query.from_user.id
name = f'{query.from_user.first_name or ""} {query.from_user.last_name or ""}'
l9Id = self.db.get('tg_users', f"tgId = {tgId}", ["l9Id"])
if l9Id == []:
l9Id = self.l9lk.initUser(0)
user = {"l9Id": l9Id, "tgId": tgId, "name": name}
self.db.insert('tg_users', user)
else:
l9Id = l9Id[0][0]
tag = self.db.get('tg_users', f"tgId = {tgId}", ["posTag"])[0][0]
return tag, l9Id, f'{tgId}\t{tag}\t{name}\tC:{query.data}'
def changeTag(self, tgId: int, tag: str) -> None:
"""Сменить тэг пользователя"""
self.db.update('tg_users', f"tgId = {tgId}", f"posTag = '{tag}'")

View File

@ -1,21 +0,0 @@
-- Сведения о пользователях бота Telegram
CREATE TABLE IF NOT EXISTS `tg_users` (
`l9Id` bigint NOT NULL,
-- Идентификатор пользователя системы
`tgId` bigint NOT NULL,
-- ID пользователя в Telegram
`name` TEXT,
-- (optional) Имя пользователя в Telegram
`posTag` varchar(30) DEFAULT 'not_started',
-- Позиция пользователя в диалоге с ботом:
-- (default) not_started - только что вступил в диалог
-- add - добавляет группу
-- conf_{groupId} - пользователь на стадии подтверждения выбранной группы
-- ready - готов к работе
PRIMARY KEY (`l9Id`),
CONSTRAINT `l9_tg` FOREIGN KEY (`l9Id`) REFERENCES `users` (`l9Id`) ON DELETE CASCADE ON UPDATE CASCADE
);

View File

@ -1,36 +0,0 @@
[etc]
oops=Ой!
cancel=Действие отменено
need_group=Действие отменено
❗️ Ты не подключен ни к одной группе, надо срочно это исправлять!
Введи номер своей группы
overlimit=Бот работает в тестовом режиме, поэтому количество пользователей временно ограничено.
К сожалению, в данный момент лимит превышен, поэтому доступ для вас закрыт 😢
Попробуйте зайти на следующей неделе, когда лимит будет повышен
hello=Привет! Я твой новый помощник, который подскажет тебе, какая сейчас пара, и будет напоминать о занятиях, чтобы ты ничего не упустил 🤗
Давай знакомиться! Введи свой номер группы (например, 2305 или 2305-240502D)
[group]
connected=Поздравляем, твоя группа %%s, направление "%%s", подключена!
exists=❗️Эта группа у тебя уже подключена
error=❗У меня этой группы пока нет, а сайте возникла какая-то ошибка.
Попробуйте позже
checkShedule:Такой группы у меня пока нет в базе, но она есть на сайте
%%s
Проверь, пожалуйста, что это твоя группа и нажми кнопку
empty=К сожалению, такой группы нет ни в моей базе, ни на сайте университета :(
loading=Загружаю расписание...
loaded=Расписание загружено!
nogroup=Возможно, ты написал не ту группу, попробуй снова
[shedule]
near=Ближайшая пара
next_days=❗️Сегодня пар нет
Ближайшая пара
tomorrow=завтра
today=Ближайшая пара сегодня
now=Сейчас
next=Далее
next_empty=Далее сегодня ничего не будет
no_shedule=Расписание пустое
Либо пары кончились, либо что-то пошло не так

View File

@ -1,32 +0,0 @@
from telegram import (
InlineKeyboardMarkup,
InlineKeyboardButton,
ReplyKeyboardMarkup,
KeyboardButton,
)
class Keyboard:
def confirm() -> InlineKeyboardMarkup:
"""Клавиатура Да/Нет"""
buttons = [
[
InlineKeyboardButton("Да", callback_data="yes"),
InlineKeyboardButton("Нет", callback_data="no"),
]
]
return InlineKeyboardMarkup(buttons)
def cancel() -> ReplyKeyboardMarkup:
"""Кнопка отмены"""
buttons = [[KeyboardButton("Отмена")]]
return ReplyKeyboardMarkup(
buttons, resize_keyboard=True, one_time_keyboard=True
)
def menu() -> ReplyKeyboardMarkup:
"""Кнопка Главного меню"""
buttons = [[KeyboardButton("Главное меню")]]
return ReplyKeyboardMarkup(
buttons, resize_keyboard=True, one_time_keyboard=True
)

View File

@ -1,16 +0,0 @@
# -*- coding: utf-8 -*-
import json
import os
def loadJSON(name):
path = f"{name}.json"
if os.path.exists(path):
with open(path, encoding='utf-8') as file:
return json.load(file)
def saveJSON(name, dct):
path = f"{name}.json"
with open(path, "w", encoding='utf-8') as file:
json.dump(dct, file, ensure_ascii=False, indent="\t")

View File

@ -1,14 +0,0 @@
month = (
"января",
"февраля",
"марта",
"апреля",
"мая",
"июня",
"июля",
"августа",
"сентября",
"октября",
"ноября",
"декабря",
)