-
Notifications
You must be signed in to change notification settings - Fork 112
/
bot.py
executable file
·408 lines (303 loc) · 11.4 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
#!/usr/bin/env python3
import logging
from time import sleep
import traceback
import sys
from html import escape
import pickledb
from telegram import ParseMode, TelegramError, Update
from telegram.ext import Updater, MessageHandler, CommandHandler, Filters
from telegram.ext.dispatcher import run_async
from config import BOTNAME, TOKEN
help_text = (
"Welcomes everyone that enters a group chat that this bot is a "
"part of. By default, only the person who invited the bot into "
"the group is able to change settings.\nCommands:\n\n"
"/welcome - Set welcome message\n"
"/goodbye - Set goodbye message\n"
"/disable\\_goodbye - Disable the goodbye message\n"
"/lock - Only the person who invited the bot can change messages\n"
"/unlock - Everyone can change messages\n"
'/quiet - Disable "Sorry, only the person who..." '
"& help messages\n"
'/unquiet - Enable "Sorry, only the person who..." '
"& help messages\n\n"
"You can use _$username_ and _$title_ as placeholders when setting"
" messages. [HTML formatting]"
"(https://core.telegram.org/bots/api#formatting-options) "
"is also supported.\n"
)
"""
Create database object
Database schema:
<chat_id> -> welcome message
<chat_id>_bye -> goodbye message
<chat_id>_adm -> user id of the user who invited the bot
<chat_id>_lck -> boolean if the bot is locked or unlocked
<chat_id>_quiet -> boolean if the bot is quieted
chats -> list of chat ids where the bot has received messages in.
"""
# Create database object
db = pickledb.load("bot.db", True)
if not db.get("chats"):
db.set("chats", [])
# Set up logging
root = logging.getLogger()
root.setLevel(logging.INFO)
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
@run_async
def send_async(context, *args, **kwargs):
context.bot.send_message(*args, **kwargs)
def check(update, context, override_lock=None):
"""
Perform some checks on the update. If checks were successful, returns True,
else sends an error message to the chat and returns False.
"""
chat_id = update.message.chat_id
chat_str = str(chat_id)
if chat_id > 0:
send_async(
context, chat_id=chat_id, text="Please add me to a group first!",
)
return False
locked = override_lock if override_lock is not None else db.get(chat_str + "_lck")
if locked and db.get(chat_str + "_adm") != update.message.from_user.id:
if not db.get(chat_str + "_quiet"):
send_async(
context,
chat_id=chat_id,
text="Sorry, only the person who invited me can do that.",
)
return False
return True
# Welcome a user to the chat
def welcome(update, context, new_member):
""" Welcomes a user to the chat """
message = update.message
chat_id = message.chat.id
logger.info(
"%s joined to chat %d (%s)",
escape(new_member.first_name),
chat_id,
escape(message.chat.title),
)
# Pull the custom message for this chat from the database
text = db.get(str(chat_id))
# Use default message if there's no custom one set
if text is None:
text = "Hello $username! Welcome to $title 😊"
# Replace placeholders and send message
text = text.replace("$username", new_member.first_name)
text = text.replace("$title", message.chat.title)
send_async(context, chat_id=chat_id, text=text, parse_mode=ParseMode.HTML)
# Welcome a user to the chat
def goodbye(update, context):
""" Sends goodbye message when a user left the chat """
message = update.message
chat_id = message.chat.id
logger.info(
"%s left chat %d (%s)",
escape(message.left_chat_member.first_name),
chat_id,
escape(message.chat.title),
)
# Pull the custom message for this chat from the database
text = db.get(str(chat_id) + "_bye")
# Goodbye was disabled
if text is False:
return
# Use default message if there's no custom one set
if text is None:
text = "Goodbye, $username!"
# Replace placeholders and send message
text = text.replace("$username", message.left_chat_member.first_name)
text = text.replace("$title", message.chat.title)
send_async(context, chat_id=chat_id, text=text, parse_mode=ParseMode.HTML)
# Introduce the bot to a chat its been added to
def introduce(update, context):
"""
Introduces the bot to a chat its been added to and saves the user id of the
user who invited us.
"""
chat_id = update.message.chat.id
invited = update.message.from_user.id
logger.info(
"Invited by %s to chat %d (%s)", invited, chat_id, update.message.chat.title,
)
db.set(str(chat_id) + "_adm", invited)
db.set(str(chat_id) + "_lck", True)
text = (
f"Hello {update.message.chat.title}! "
"I will now greet anyone who joins this chat with a "
"nice message 😊 \nCheck the /help command for more info!"
)
send_async(context, chat_id=chat_id, text=text)
# Print help text
def help(update, context):
""" Prints help text """
chat_id = update.message.chat.id
chat_str = str(chat_id)
if (
not db.get(chat_str + "_quiet")
or db.get(chat_str + "_adm") == update.message.from_user.id
):
send_async(
context,
chat_id=chat_id,
text=help_text,
parse_mode=ParseMode.MARKDOWN,
disable_web_page_preview=True,
)
# Set custom message
def set_welcome(update, context):
""" Sets custom welcome message """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context):
return
# Split message into words and remove mentions of the bot
message = update.message.text.partition(" ")[2]
# Only continue if there's a message
if not message:
send_async(
context,
chat_id=chat_id,
text="You need to send a message, too! For example:\n"
"<code>/welcome Hello $username, welcome to "
"$title!</code>",
parse_mode=ParseMode.HTML,
)
return
# Put message into database
db.set(str(chat_id), message)
send_async(context, chat_id=chat_id, text="Got it!")
# Set custom message
def set_goodbye(update, context):
""" Enables and sets custom goodbye message """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context):
return
# Split message into words and remove mentions of the bot
message = update.message.text.partition(" ")[2]
# Only continue if there's a message
if not message:
send_async(
context,
chat_id=chat_id,
text="You need to send a message, too! For example:\n"
"<code>/goodbye Goodbye, $username!</code>",
parse_mode=ParseMode.HTML,
)
return
# Put message into database
db.set(str(chat_id) + "_bye", message)
send_async(context, chat_id=chat_id, text="Got it!")
def disable_goodbye(update, context):
""" Disables the goodbye message """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context):
return
# Disable goodbye message
db.set(str(chat_id) + "_bye", False)
send_async(context, chat_id=chat_id, text="Got it!")
def lock(update, context):
""" Locks the chat, so only the invitee can change settings """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context, override_lock=True):
return
# Lock the bot for this chat
db.set(str(chat_id) + "_lck", True)
send_async(context, chat_id=chat_id, text="Got it!")
def quiet(update, context):
""" Quiets the chat, so no error messages will be sent """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context, override_lock=True):
return
# Lock the bot for this chat
db.set(str(chat_id) + "_quiet", True)
send_async(context, chat_id=chat_id, text="Got it!")
def unquiet(update, context):
""" Unquiets the chat """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context, override_lock=True):
return
# Lock the bot for this chat
db.set(str(chat_id) + "_quiet", False)
send_async(context, chat_id=chat_id, text="Got it!")
def unlock(update, context):
""" Unlocks the chat, so everyone can change settings """
chat_id = update.message.chat.id
# Check admin privilege and group context
if not check(update, context):
return
# Unlock the bot for this chat
db.set(str(chat_id) + "_lck", False)
send_async(context, chat_id=chat_id, text="Got it!")
def empty_message(update, context):
"""
Empty messages could be status messages, so we check them if there is a new
group member, someone left the chat or if the bot has been added somewhere.
"""
# Keep chatlist
chats = db.get("chats")
if update.message.chat.id not in chats:
chats.append(update.message.chat.id)
db.set("chats", chats)
logger.info("I have been added to %d chats" % len(chats))
if update.message.new_chat_members:
for new_member in update.message.new_chat_members:
# Bot was added to a group chat
if new_member.username == BOTNAME:
return introduce(update, context)
# Another user joined the chat
else:
return welcome(update, context, new_member)
# Someone left the chat
elif update.message.left_chat_member is not None:
if update.message.left_chat_member.username != BOTNAME:
return goodbye(update, context)
def error(update, context, **kwargs):
""" Error handling """
error = context.error
try:
if isinstance(error, TelegramError) and (
error.message == "Unauthorized"
or error.message == "Have no rights to send a message"
or "PEER_ID_INVALID" in error.message
):
chats = db.get("chats")
chats.remove(update.message.chat_id)
db.set("chats", chats)
logger.info("Removed chat_id %s from chat list" % update.message.chat_id)
else:
logger.error("An error (%s) occurred: %s" % (type(error), error.message))
except:
pass
def main():
# Create the Updater and pass it your bot's token.
updater = Updater(TOKEN, workers=10, use_context=True)
# Get the dispatcher to register handlers
dp = updater.dispatcher
dp.add_handler(CommandHandler("start", help))
dp.add_handler(CommandHandler("help", help))
dp.add_handler(CommandHandler("welcome", set_welcome))
dp.add_handler(CommandHandler("goodbye", set_goodbye))
dp.add_handler(CommandHandler("disable_goodbye", disable_goodbye))
dp.add_handler(CommandHandler("lock", lock))
dp.add_handler(CommandHandler("unlock", unlock))
dp.add_handler(CommandHandler("quiet", quiet))
dp.add_handler(CommandHandler("unquiet", unquiet))
dp.add_handler(MessageHandler(Filters.status_update, empty_message))
dp.add_error_handler(error)
updater.start_polling(timeout=30, clean=True)
updater.idle()
if __name__ == "__main__":
main()