import asyncio
import discord
import logging
import re
import functools
from copy import deepcopy
from collections import namedtuple
from typing import Union, Optional, Iterable, Callable, Awaitable
from datetime import datetime, timedelta
from multiprocessing import TimeoutError
from multiprocessing.pool import Pool
from redbot.core import Config
from redbot.core.bot import Red
from redbot.core.i18n import Translator
from redbot.core.commands import BadArgument, MemberConverter
try:
from redbot.core.modlog import get_modlog_channel as get_red_modlog_channel
except RuntimeError:
pass # running sphinx-build raises an error when importing this module
from .cache import MemoryCache
from . import errors
log = logging.getLogger("red.laggron.warnsystem")
_ = Translator("WarnSystem", __file__)
id_pattern = re.compile(r"([0-9]{15,21})$")
class SafeMember:
def __init__(self, member: discord.Member) -> None:
self.name = str(member.name)
self.display_name = str(member.display_name)
self.nick = str(member.nick)
self.id = str(member.id)
self.mention = str(member.mention)
self.discriminator = str(member.discriminator)
self.color = str(member.color)
self.colour = str(member.colour)
self.created_at = str(member.created_at)
self.joined_at = str(member.joined_at)
def __str__(self):
return self.name
def __getattr__(self, name):
return self
class FakeRole:
"""
We need to fake some attributes of roles for the class UnavailableMember
"""
position = 0
colour = discord.Embed.Empty
class UnavailableMember(discord.abc.User, discord.abc.Messageable):
"""
A class that reproduces the behaviour of a discord.Member instance, except
the member is not in the guild. This is used to prevent calling bot.fetch_info
which has a very high cooldown.
"""
def __init__(self, bot, state, user_id: int):
self.bot = bot
self._state = state
self.id = user_id
self.top_role = FakeRole()
@classmethod
def _check_id(cls, member_id):
if not id_pattern.match(member_id):
raise ValueError(f"You provided an invalid ID: {member_id}")
return int(member_id)
@classmethod
async def convert(cls, ctx, text):
try:
member = await MemberConverter().convert(ctx, text)
except BadArgument:
pass
else:
return member
try:
member_id = cls._check_id(text)
except ValueError:
raise BadArgument(
_(
"The given member cannot be found.\n"
"If you're trying to hackban, the user ID is not valid."
)
)
return cls(ctx.bot, ctx._state, member_id)
@property
def name(self):
return "Unknown"
@property
def display_name(self):
return "Unknown"
@property
def mention(self):
return f"<@{self.id}>"
@property
def avatar_url(self):
return ""
def __str__(self):
return "Unknown#0000"
# the 3 following functions were copied from the discord.User class, credit to Rapptz
# https://github.com/Rapptz/discord.py/blob/master/discord/user.py#L668
@property
def dm_channel(self):
"""Optional[:class:`DMChannel`]: Returns the channel associated with this user if it exists.
If this returns ``None``, you can create a DM channel by calling the
:meth:`create_dm` coroutine function.
"""
return self._state._get_private_channel_by_user(self.id)
async def create_dm(self):
"""Creates a :class:`DMChannel` with this user.
This should be rarely called, as this is done transparently for most
people.
"""
found = self.dm_channel
if found is not None:
return found
state = self._state
data = await state.http.start_private_message(self.id)
return state.add_dm_channel(data)
async def _get_channel(self):
channel = await self.create_dm()
return channel
[docs]class API:
"""
Interact with WarnSystem from your cog.
To import the cog and use the functions, type this in your code:
.. code-block:: python
warnsystem = bot.get_cog('WarnSystem').api
.. warning:: If ``warnsystem`` is :py:obj:`None`, the cog is
not loaded/installed. You won't be able to interact with
the API at this point.
.. tip:: You can get the cog version by doing this
.. code-block:: python
version = bot.get_cog('WarnSystem').__version__
"""
def __init__(self, bot: Red, config: Config, cache: MemoryCache):
self.bot = bot
self.data = config
self.cache = cache
self.re_pool = Pool(maxtasksperchild=1000)
self.regex_timeout = 1
self.warned_guilds = [] # see automod_check_for_autowarn
self.antispam = {} # see automod_process_antispam
self.antispam_warn_queue = {} # see automod_warn
self.automod_warn_task: asyncio.Task
def _get_datetime(self, time: int) -> datetime:
return datetime.fromtimestamp(int(time))
def _get_timedelta(self, time: int) -> timedelta:
return timedelta(seconds=int(time))
def _format_datetime(self, time: datetime):
return time.strftime("%a %d %B %Y %H:%M:%S")
def _format_timedelta(self, time: timedelta):
"""Format a timedelta object into a string"""
# blame python for not creating a strftime attribute
plural = lambda name, amount: name[1] if amount > 1 else name[0]
strings = []
seconds = time.total_seconds()
years, seconds = divmod(seconds, 31622400)
months, seconds = divmod(seconds, 2635200)
weeks, seconds = divmod(seconds, 604800)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
units = [years, months, weeks, days, hours, minutes, seconds]
# tuples inspired from mikeshardmind
# https://github.com/mikeshardmind/SinbadCogs/blob/v3/scheduler/time_utils.py#L29
units_name = {
0: (_("year"), _("years")),
1: (_("month"), _("months")),
2: (_("week"), _("weeks")),
3: (_("day"), _("days")),
4: (_("hour"), _("hours")),
5: (_("minute"), _("minutes")),
6: (_("second"), _("seconds")),
}
for i, value in enumerate(units):
if value < 1:
continue
unit_name = plural(units_name.get(i), value)
strings.append(f"{round(value)} {unit_name}")
string = ", ".join(strings[:-1])
if len(strings) > 1:
string += _(" and ") + strings[-1]
else:
string = strings[0]
return string
async def _start_timer(self, guild: discord.Guild, member: discord.Member, case: dict) -> bool:
"""Start the timer for a temporary mute/ban."""
if not case["duration"]:
raise errors.BadArgument("No duration for this warning!")
await self.cache.add_temp_action(guild, member, case)
return True
async def _mute(self, member: discord.Member, reason: Optional[str] = None):
"""Mute an user on the guild."""
old_roles = []
guild = member.guild
mute_role = guild.get_role(await self.cache.get_mute_role(guild))
remove_roles = await self.data.guild(guild).remove_roles()
if not mute_role:
raise errors.MissingMuteRole("You need to create the mute role before doing this.")
if remove_roles:
old_roles = member.roles.copy()
old_roles.remove(guild.default_role)
old_roles = [
x for x in old_roles if x.position < guild.me.top_role.position and not x.managed
]
fails = []
for role in old_roles:
try:
await member.remove_roles(role, reason=reason)
except discord.errors.HTTPException:
fails.append(role)
if fails:
log.warn(
f"[Guild {guild.id}] Failed to remove roles from {member} (ID: {member.id}) "
f"while muting. Roles: {', '.join([f'{x.name} ({x.id})' for x in fails])}",
)
await member.add_roles(mute_role, reason=reason)
return old_roles
async def _unmute(self, member: discord.Member, reason: str, old_roles: list = None):
"""Unmute an user on the guild."""
guild = member.guild
mute_role = guild.get_role(await self.cache.get_mute_role(guild))
if not mute_role:
raise errors.MissingMuteRole(
f"Lost the mute role on guild {guild.name} (ID: {guild.id}"
)
await member.remove_roles(mute_role, reason=reason)
await member.add_roles(*old_roles, reason=reason)
async def _create_case(
self,
guild: discord.Guild,
user: discord.User,
author: Union[discord.Member, str],
level: int,
time: datetime,
reason: Optional[str] = None,
duration: Optional[timedelta] = None,
roles: Optional[list] = None,
modlog_message: Optional[discord.Message] = None,
) -> dict:
"""Create a new case for a member. Don't call this, call warn instead."""
data = {
"level": level,
"author": author
if not isinstance(author, (discord.User, discord.Member))
else author.id,
"reason": reason,
"time": int(time.timestamp()), # seconds since epoch
"duration": None if not duration else duration.total_seconds(),
"roles": [] if not roles else [x.id for x in roles],
}
if modlog_message:
data["modlog_message"] = {
"channel_id": modlog_message.channel.id,
"message_id": modlog_message.id,
}
async with self.data.custom("MODLOGS", guild.id, user.id).x() as logs:
logs.append(data)
return data
[docs] async def get_case(
self, guild: discord.Guild, user: Union[discord.User, discord.Member], index: int
) -> dict:
"""
Get a specific case for a user.
Parameters
----------
guild: discord.Guild
The guild of the member.
user: Union[discord.User, discord.Member]
The user you want to get the case from. Can be a :class:`discord.User` if the member is
not in the server.
index: int
The case index you want to get. Must be positive.
Returns
-------
dict
A :py:class:`dict` which has the following body:
.. code-block: python3
{
"level" : int, # between 1 and 5, the warning level
"author" : Union[discord.Member, str], # the member that warned the user
"reason" : Optional[str], # the reason of the warn, can be None
"time" : datetime.datetime, # the date when the warn was set
}
Raises
------
~warnsystem.errors.NotFound
The case requested doesn't exist.
"""
try:
case = (await self.data.custom("MODLOGS", guild.id, user.id).x())[index - 1]
except IndexError:
raise errors.NotFound("The case requested doesn't exist.")
else:
time = case["time"]
if time:
case["time"] = self._get_datetime(time)
return case
[docs] async def get_all_cases(
self, guild: discord.Guild, user: Optional[Union[discord.User, discord.Member]] = None
) -> list:
"""
Get all cases for a member of a guild.
Parameters
----------
guild: discord.Guild
The guild where you want to get the cases from.
user: Optional[Union[discord.User, discord.Member]]
The user you want to get the cases from. If this arguments is omitted, all cases of
the guild are returned.
Returns
-------
list
A list of all cases of a user/guild. The cases are sorted from the oldest to the
newest.
If you specified a user, you should get something like this:
.. code-block:: python3
[
{ # case #1
"level" : int, # between 1 and 5, the warning level
"author" : Union[discord.Member, str], # the member that warned the user
"reason" : Optional[str], # the reason of the warn, can be None
"time" : datetime.datetime, # the date when the warn was set
},
{
# case #2
},
# ...
]
However, if you didn't specify a user, you got all cases of the guild. As for the user,
you will get a :py:class:`list` of the cases, with another key for specifying the
warned user:
.. code-block:: python3
{ # case #1
"level" : int, # between 1 and 5, the warning level
"author" : Union[discord.Member, str], # the member that warned the user
"reason" : Optional[str], # the reason of the warn, can be None
"time" : datetime.datetime, # the date when the warn was set
"member" : discord.User, # the member warned, this key is specific to guild
}
"""
if user:
return await self.data.custom("MODLOGS", guild.id, user.id).x()
logs = await self.data.custom("MODLOGS", guild.id).all()
all_cases = []
for member, content in logs.items():
if member == "x":
continue
for log in content["x"]:
time = log["time"]
if time:
log["time"] = self._get_datetime(time)
# gotta get that state somehow
log["member"] = self.bot.get_user(int(member)) or UnavailableMember(
self.bot, self.bot.user._state, member
)
log["author"] = self.bot.get_user(int(log["author"])) or UnavailableMember(
self.bot, self.bot.user._state, log["author"]
)
all_cases.append(log)
return sorted(all_cases, key=lambda x: x["time"]) # sorted from oldest to newest
[docs] async def edit_case(
self,
guild: discord.Guild,
user: Union[discord.User, discord.Member],
index: int,
new_reason: str,
) -> bool:
"""
Edit the reason of a case.
Parameters
----------
guild: discord.Guild
The guild where you want to get the case from.
user: Union[discord.User, discord.Member]
The user you want to get the case from.
index: int
The number of the case you want to edit.
new_reason: str
The new reason to set.
Returns
-------
bool
:py:obj:`True` if the action succeeded.
Raises
------
~warnsystem.errors.BadArgument
The reason is above 1024 characters. Due to Discord embed rules, you have to make it
shorter.
~warnsystem.errors.NotFound
The case requested doesn't exist.
"""
if len(new_reason) > 1024:
raise errors.BadArgument("The reason must not be above 1024 characters.")
case = await self.get_case(guild, user, index)
case["reason"] = new_reason
case["time"] = int(case["time"].timestamp())
async with self.data.custom("MODLOGS", guild.id, user.id).x() as logs:
logs[index - 1] = case
return True
[docs] async def get_modlog_channel(
self, guild: discord.Guild, level: Optional[Union[int, str]] = None
) -> discord.TextChannel:
"""
Get the WarnSystem's modlog channel on the current guild.
When you call this, the channel is get with the following order:
#. Get the modlog channel associated to the type, if provided
#. Get the defult modlog channel set with WarnSystem
#. Get the Red's modlog channel associated to the server
Parameters
----------
guild: discord.Guild
The guild you want to get the modlog from.
level: Optional[Union[int, str]]
Can be an :py:class:`int` between 1 and 5, a :py:class:`str` (``"all"``)
or :py:obj:`None`.
* If the argument is omitted (or :py:obj:`None` is provided), the default modlog
channel will be returned.
* If an :py:class:`int` is given, the modlog channel associated to this warning
level will be returned. If a specific channel was not set for this level, the
default modlog channel will be returned instead.
* If ``"all"`` is returned, a :py:class:`dict` will be returned. It should be built
like this:
.. code-block:: python3
{
"main" : 012345678987654321,
"1" : None,
"2" : None,
"3" : None,
"4" : 478065433996537900,
"5" : 567943553912O46428,
}
A dict with the possible channels is returned, associated with an :py:class:`int`
corresponding to the channel ID set, or :py:obj:`None` if it was not set.
For technical reasons, the default channel is actually named ``"main"`` in the dict.
Returns
-------
channel: discord.TextChannel
The channel requested.
.. note:: It can be :py:obj:`None` if the channel doesn't exist anymore.
Raises
------
~warnsystem.errors.NotFound
There is no modlog channel set with WarnSystem or Red, ask the user to set one.
"""
# raise errors if the arguments are wrong
if level:
msg = "The level must be an int between 1 and 5 ; or a string that " 'should be "all"'
if not isinstance(level, int) and level != "all":
raise errors.InvalidLevel(msg)
elif isinstance(level, int) and not 1 <= level <= 5:
raise errors.InvalidLevel(msg)
if level == "all":
return await self.data.guild(guild).channels.all()
default_channel = await self.data.guild(guild).channels.main()
if level:
channel = await self.data.guild(guild).channels.get_raw(str(level))
else:
return default_channel
if not default_channel and not channel:
# warnsystem default channel doesn't exist, let's try to get Red's one
try:
return await get_red_modlog_channel(guild)
except RuntimeError:
raise errors.NotFound("No modlog found from WarnSystem or Red")
return self.bot.get_channel(channel if channel else default_channel)
[docs] async def get_embeds(
self,
guild: discord.Guild,
member: Union[discord.Member, UnavailableMember],
author: Union[discord.Member, str],
level: int,
reason: Optional[str] = None,
time: Optional[timedelta] = None,
date: Optional[datetime] = None,
message_sent: bool = True,
) -> tuple:
"""
Return two embeds, one for the modlog and one for the member.
.. warning:: Unlike for the warning, the arguments are not checked and won't raise errors
if they are wrong. It is recommanded to call :func:`~warnsystem.api.API.warn` and let
it generate the embeds instead.
Parameters
----------
guild: discord.Guild
The Discord guild where the warning takes place.
member: Union[discord.Member, UnavailableMember]
The warned member. Should only be :class:`UnavailableMember` in case of a hack ban.
author: Union[discord.Member, str]
The moderator that warned the user. If it's not a Discord user, you can specify a
:py:class:`str` instead (e.g. "Automod").
level: int
The level of the warning which should be between 1 and 5.
reason: Optional[str]
The reason of the warning.
time: Optional[timedelta]
The time before the action ends. Only for mute and ban.
date: Optional[datetime]
When the action was taken.
message_sent: bool
Set to :py:obj:`False` if the embed couldn't be sent to the warned user.
Returns
-------
tuple
A :py:class:`tuple` with the modlog embed at index 0, and the user embed at index 1.
"""
action = {
1: (_("warn"), _("warns")),
2: (_("mute"), _("mutes")),
3: (_("kick"), _("kicks")),
4: (_("softban"), _("softbans")),
5: (_("ban"), _("bans")),
}.get(level, _("unknown"))
mod_message = ""
if not reason:
reason = _("No reason was provided.")
mod_message = _("\nEdit this with `[p]warnings {id}`").format(id=member.id)
logs = await self.data.custom("MODLOGS", guild.id, member.id).x()
# prepare the status field
total_warns = len(logs) + 1
total_type_warns = (
len([x for x in logs if x["level"] == level]) + 1
) # number of warns of the received type
# a lambda that returns a string; if True is given, a third person sentence is returned
# (modlog), if False is given, a first person sentence is returned (DM user)
current_status = lambda x: _(
"{who} now {verb} {total} {warning} ({total_type} {action})"
).format(
who=_("The member") if x else _("You"),
verb=_("has") if x else _("have"),
total=total_warns,
warning=_("warnings") if total_warns > 1 else _("warning"),
total_type=total_type_warns,
action=action[1] if total_type_warns > 1 else action[0],
)
# we set any value that can be used multiple times
invite = None
log_description = await self.data.guild(guild).embed_description_modlog.get_raw(level)
if "{invite}" in log_description:
try:
invite = await guild.create_invite(max_uses=1)
except Exception:
invite = _("*[couldn't create an invite]*")
user_description = await self.data.guild(guild).embed_description_user.get_raw(level)
if "{invite}" in user_description and not invite:
try:
invite = await guild.create_invite(max_uses=1)
except Exception:
invite = _("*[couldn't create an invite]*")
if date:
today = date.strftime("%a %d %B %Y %H:%M")
else:
today = datetime.utcnow()
if time:
duration = self._format_timedelta(time)
else:
duration = _("*[No time given]*")
def format_description(text):
try:
return text.format(
invite=invite,
member=SafeMember(member),
mod=SafeMember(author),
duration=duration,
time=today,
)
except Exception:
log.error(
f"[Guild {guild.id}] Failed to format description in embed", exc_info=True
)
return "Failed to format field."
link = re.search(r"(https?://)\S+\.(jpg|jpeg|png|gif|webm)", reason)
# embed for the modlog
log_embed = discord.Embed()
log_embed.set_author(name=f"{member.name} | {member.id}", icon_url=member.avatar_url)
log_embed.title = _("Level {level} warning ({action})").format(
level=level, action=action[0]
)
log_embed.description = format_description(log_description)
log_embed.add_field(name=_("Member"), value=member.mention, inline=True)
log_embed.add_field(name=_("Moderator"), value=author.mention, inline=True)
if time:
log_embed.add_field(name=_("Duration"), value=duration, inline=True)
log_embed.add_field(name=_("Reason"), value=reason + mod_message, inline=False)
log_embed.add_field(name=_("Status"), value=current_status(True), inline=False)
log_embed.timestamp = date
log_embed.set_thumbnail(url=await self.data.guild(guild).thumbnails.get_raw(level))
log_embed.colour = await self.data.guild(guild).colors.get_raw(level)
log_embed.url = await self.data.guild(guild).url()
log_embed.set_image(url=link.group() if link else "")
if not message_sent:
log_embed.description += _(
"\n\n***The message could not be delivered to the user. They may have DMs "
"disabled, blocked the bot, or may not have a mutual server.***"
)
# embed for the member in DM
user_embed = deepcopy(log_embed)
user_embed.set_author(name="")
user_embed.description = format_description(user_description)
if mod_message:
user_embed.set_field_at(3 if time else 2, name=_("Reason"), value=reason)
user_embed.remove_field(4 if time else 3) # removes status field (gonna be added back)
user_embed.remove_field(0) # removes member field
user_embed.add_field(name=_("Status"), value=current_status(False), inline=False)
if time:
user_embed.set_field_at(
1, name=_("Duration"), value=self._format_timedelta(time), inline=True
)
if not await self.data.guild(guild).show_mod():
user_embed.remove_field(0) # called twice, removing moderator field
return (log_embed, user_embed)
[docs] async def maybe_create_mute_role(self, guild: discord.Guild) -> bool:
"""
Create the mod role for WarnSystem if it doesn't exist.
This will also edit all channels to deny the following permissions to this role:
* ``send_messages``
* ``add_reactions``
* ``speak``
Parameters
----------
guild: discord.Guild
The guild you want to set up the mute in.
Returns
-------
Union[bool, list]
* :py:obj:`False` if the role already exists.
* :py:class:`list` if the role was created, with a list of errors for each channel.
Empty list means completly successful edition.
Raises
------
~warnsystem.errors.MissingPermissions
The bot lacks the :attr:`discord.Permissions.create_roles` permission.
discord.errors.HTTPException
Creating the role failed.
"""
role = await self.cache.get_mute_role(guild)
role = guild.get_role(role)
if role:
return False
if not guild.me.guild_permissions.manage_roles:
raise errors.MissingPermissions(
_("I can't manage roles, please give me this permission to continue.")
)
# no mod role on this guild, let's create one
role = await guild.create_role(
name="Muted",
reason=_(
"WarnSystem mute role. This role will be assigned to the muted members, "
"feel free to move it or modify its channel permissions."
),
)
await asyncio.sleep(0.5) # prevents an error when repositionning the role
await role.edit(
position=guild.me.top_role.position - 1,
reason=_(
"Modifying role's position, keep it under my top role so "
"I can add it to muted members."
),
)
perms = discord.PermissionOverwrite(send_messages=False, add_reactions=False, speak=False)
errors = []
for channel in guild.channels:
try:
await channel.set_permissions(
target=role,
overwrite=perms,
reason=_(
"Setting up WarnSystem mute. All muted members will have this role, "
"feel free to edit its permissions."
),
)
except discord.errors.Forbidden:
errors.append(
_(
"Cannot edit permissions of the channel {channel} because of a "
"permission error (probably enforced permission for `Manage channel`)."
).format(channel=channel.mention)
)
except discord.errors.HTTPException as e:
errors.append(
_(
"Cannot edit permissions of the channel {channel} because of "
"an unknown error."
).format(channel=channel.mention)
)
log.warn(
f"[Guild {guild.id}] Couldn't edit permissions of {channel} (ID: "
f"{channel.id}) for setting up the mute role because of an HTTPException.",
exc_info=e,
)
except Exception as e:
errors.append(
_(
"Cannot edit permissions of the channel {channel} because of "
"an unknown error."
).format(channel=channel.mention)
)
log.error(
f"[Guild {guild.id}] Couldn't edit permissions of {channel} (ID: "
f"{channel.id}) for setting up the mute role because of an unknwon error.",
exc_info=e,
)
await self.cache.update_mute_role(guild, role)
return errors
[docs] async def warn(
self,
guild: discord.Guild,
members: Iterable[Union[discord.Member, UnavailableMember]],
author: Union[discord.Member, str],
level: int,
reason: Optional[str] = None,
time: Optional[timedelta] = None,
date: Optional[datetime] = None,
ban_days: Optional[int] = None,
log_modlog: Optional[bool] = True,
log_dm: Optional[bool] = True,
take_action: Optional[bool] = True,
automod: Optional[bool] = True,
progress_tracker: Optional[Callable[[int], Awaitable[None]]] = None,
) -> bool:
"""
Set a warning on a member of a Discord guild and log it with the WarnSystem system.
.. tip:: The message that comes with the following exceptions are already
translated and ready to be sent to Discord:
* :class:`~warnsystem.errors.NotFound`
* :class:`~warnsystem.errors.LostPermissions`
* :class:`~warnsystem.errors.MemberTooHigh`
* :class:`~warnsystem.errors.MissingPermissions`
* :class:`~warnsystem.errors.SuicidePrevention`
Parameters
----------
guild: discord.Guild
The guild of the member to warn
member: Iterable[Union[discord.Member, UnavailableMember]]
The member that will be warned. It can be an instance of
:py:class:`warnsystem.api.UnavailableMember` if you need
to ban someone not in the guild.
author: Union[discord.Member, str]
The member that called the action, which will be associated to the log.
level: int
An :py:class:`int` between 1 and 5, specifying the warning level:
#. Simple DM warning
#. Mute (can be temporary)
#. Kick
#. Softban
#. Ban (can be temporary ban, or hack ban, if the member is not in the server)
reason: Optional[str]
The optional reason of the warning. It is strongly recommanded to set one.
time: Optional[timedelta]
The time before cancelling the action. This only works for a mute or a ban.
date: Optional[datetime]
When the action was taken. Only use if you want to overwrite the current date and time.
ban_days: Optional[int]
Overwrite number of days of messages to delete for a ban. Only used for warnings
level 4 or 5. If this is omitted, the bot will fall back to the user defined setting.
log_modlog: Optional[bool]
Specify if an embed should be posted to the modlog channel. Default to :py:obj:`True`.
log_dm: Optional[bool]
Specify if an embed should be sent to the warned user. Default to :py:obj:`True`.
take_action: Optional[bool]
Specify if the bot should take action on the member (mute, kick, softban, ban). If set
to :py:obj:`False`, the bot will only send a log embed to the member and in the modlog.
Default to :py:obj:`True`.
automod: Optional[bool]
Set to :py:obj:`False` to skip automod, preventing multiple warnings at once and
saving performances. Automod might trigger on a next warning though.
progress_tracker: Optional[Callable[[int], Awaitable[None]]]
an async callable (function or lambda) which takes one argument to follow the progress
of the warn. The argument is the number of warns committed. Here's an example:
.. code-block:: python3
i = 0
message = await ctx.send("Mass warn started...")
async def update_count(count):
i = count
async def update_msg():
await message.edit(content=f"{i}/{len(members)} members warned.")
await asyncio.sleep(1)
await api.warn(guild, members, ctx.author, 1, progress_tracker=update_count)
Returns
-------
dict
A dict of members which couldn't be warned associated to the exception related.
Raises
------
~warnsystem.errors.InvalidLevel
The level must be an :py:class:`int` between 1 and 5.
~warnsystem.errors.BadArgument
You need to provide a valid :class:`discord.Member` object, except for a
hackban where a :class:`discord.User` works.
~warnsystem.errors.MissingMuteRole
You're trying to mute someone but the mute role was not setup yet.
You can fix this by calling :func:`~warnsystem.api.API.maybe_create_mute_role`.
~warnsystem.errors.LostPermissions
The bot lost a permission to do something (it had the perm before). This
can be lost permissions for sending messages to the modlog channel or
interacting with the mute role.
~warnsystem.errors.MemberTooHigh
The bot is trying to take actions on someone but their top role is higher
than the bot's top role in the guild's hierarchy.
~warnsystem.errors.NotAllowedByHierarchy
The moderator trying to warn someone is lower than them in the role hierarchy,
while the bot still has permissions to act. This is raised only if the
hierarchy check is enabled.
~warnsystem.errors.MissingPermissions
The bot lacks a permissions to do something. Can be adding role, kicking
or banning members.
discord.errors.NotFound
When the user ID provided for hackban isn't recognized by Discord.
discord.errors.HTTPException
Unknown error from Discord API. It's recommanded to catch this
potential error too.
"""
async def warn_member(member: Union[discord.Member, UnavailableMember], audit_reason: str):
nonlocal i
roles = []
# permissions check
if level > 1 and guild.me.top_role.position <= member.top_role.position:
# check if the member is below the bot in the roles's hierarchy
return errors.MemberTooHigh(
_(
"Cannot take actions on this member, they are "
"above me in the roles hierarchy. Modify "
"the hierarchy so my top role ({bot_role}) is above {member_role}."
).format(bot_role=guild.me.top_role.name, member_role=member.top_role.name)
)
if await self.data.guild(guild).respect_hierarchy() and (
not (await self.bot.is_owner(author) or author.id == guild.owner_id)
and member.top_role.position >= author.top_role.position
):
return errors.NotAllowedByHierarchy(
"The moderator is lower than the member in the servers's role hierarchy."
)
if level > 2 and member.id == guild.owner_id:
return errors.MissingPermissions(
_("I can't take actions on the owner of the guild.")
)
if member == guild.me:
return errors.SuicidePrevention(
_(
"Why would you warn me? I did nothing wrong :c\n"
"(use a manual kick/ban instead, warning the bot will cause issues)"
)
)
# send the message to the user
if log_modlog or log_dm:
modlog_e, user_e = await self.get_embeds(
guild, member, author, level, reason, time, date
)
if log_dm:
try:
await member.send(embed=user_e)
except (discord.errors.Forbidden, errors.UserNotFound):
modlog_e = (
await self.get_embeds(
guild, member, author, level, reason, time, date, message_sent=False
)
)[0]
except discord.errors.NotFound:
raise
except discord.errors.HTTPException as e:
modlog_e = (
await self.get_embeds(
guild, member, author, level, reason, time, date, message_sent=False
)
)[0]
log.warn(
f"[Guild {guild.id}] Couldn't send a message to {member} "
f"(ID: {member.id}) because of an HTTPException.",
exc_info=e,
)
# take actions
if take_action:
audit_reason = audit_reason.format(member=member)
try:
if level == 2:
roles = await self._mute(member, audit_reason)
elif level == 3:
await guild.kick(member, reason=audit_reason)
elif level == 4:
await guild.ban(
member,
reason=audit_reason,
delete_message_days=ban_days
or await self.data.guild(guild).bandays.softban(),
)
await guild.unban(
member,
reason=_(
"Unbanning the softbanned member after cleaning up the messages."
),
)
elif level == 5:
await guild.ban(
member,
reason=audit_reason,
delete_message_days=ban_days
or await self.data.guild(guild).bandays.ban(),
)
except discord.errors.HTTPException as e:
log.warn(
f"[Guild {guild.id}] Failed to warn {member} because of "
"an unknown error from Discord.",
exc_info=e,
)
return e
# actions were taken, time to log
if log_modlog:
modlog_message = await mod_channel.send(embed=modlog_e)
else:
modlog_message = None
data = await self._create_case(
guild, member, author, level, date, reason, time, roles, modlog_message
)
# start timer if there is a temporary warning
if time and (level == 2 or level == 5):
await self._start_timer(guild, member, data)
if automod:
# This function can be pretty heavy, and the response can be seriously delayed
# because of this, so we make it a side process instead
self.bot.loop.create_task(
self.automod_check_for_autowarn(guild, member, author, level)
)
i += 1
if progress_tracker:
await progress_tracker(i)
if not 1 <= level <= 5:
raise errors.InvalidLevel("The level must be between 1 and 5.")
# we get the modlog channel now to make sure it exists before doing anything
if log_modlog:
mod_channel = await self.get_modlog_channel(guild, level)
# check if the mute role exists
mute_role = guild.get_role(await self.cache.get_mute_role(guild))
if not mute_role and level == 2:
raise errors.MissingMuteRole("You need to create the mute role before doing this.")
# we check for all permission problem that can occur before calling the API
# checks if the bot has send_messages and embed_links permissions in modlog channel
if not (
guild.me.guild_permissions.send_messages and guild.me.guild_permissions.embed_links
):
raise errors.LostPermissions(
_(
"I need the `Send messages` and `Embed links` "
"permissions in {channel} to do this."
).format(channel=mod_channel.mention)
)
if level == 2:
# mute with role
if not guild.me.guild_permissions.manage_roles:
raise errors.MissingPermissions(
_("I can't manage roles, please give me this permission to continue.")
)
if mute_role.position >= guild.me.top_role.position:
raise errors.LostPermissions(
_(
"The mute role `{mute_role}` was moved above my top role `{my_role}`. "
"Please move the roles so my top role is above the mute role."
).format(mute_role=mute_role.name, my_role=guild.me.top_role.name)
)
if level == 3:
# kick
if not guild.me.guild_permissions.kick_members:
raise errors.MissingPermissions(
_("I can't kick members, please give me this permission to continue.")
)
if level >= 4:
# softban or ban
if not guild.me.guild_permissions.ban_members:
raise errors.MissingPermissions(
_("I can't ban members, please give me this permission to continue.")
)
action = {1: _("warn"), 2: _("mute"), 3: _("kick"), 4: _("softban"), 5: _("ban")}.get(
level, _("unknown")
)
audit_reason = _(
"[WarnSystem] {action} requested by {author} (ID: {author.id}) against {member}. "
).format(
author=author, action=action, member="{member}"
) # member will be edited later
if time:
audit_reason += _("\n\nDuration: {time} ").format(time=self._format_timedelta(time))
if reason:
if len(audit_reason + reason) < 490:
audit_reason += _("Reason: {reason}").format(reason=reason)
else:
audit_reason += _("Reason too long to be shown.")
if not date:
date = datetime.utcnow()
i = 0
fails = [await warn_member(x, audit_reason) for x in members if x]
# all good!
return list(filter(None, fails))
async def _check_endwarn(self):
async def reinvite(guild, user, reason, duration):
channel = next(
(
c # guild.text_channels is already sorted by position
for c in guild.text_channels
if c.permissions_for(guild.me).create_instant_invite
),
None,
)
if channel is None:
# can't find a valid channel
log.info(
f"[Guild {guild.id}] Can't find a text channel where I can create an invite "
f"when reinviting {member} (ID: {member.id}) after its unban."
)
return
try:
invite = await channel.create_invite(max_uses=1)
except Exception as e:
log.warn(
f"[Guild {guild.id}] Couldn't create an invite to reinvite "
f"{member} (ID: {member.id}) after its unban.",
exc_info=e,
)
else:
try:
await member.send(
_(
"You were unbanned from {guild}, your temporary ban (reason: "
"{reason}) just ended after {duration}.\nYou can join back using this "
"invite: {invite}"
).format(guild=guild.name, reason=reason, duration=duration, invite=invite)
)
except discord.errors.Forbidden:
# couldn't send message to the user, quite common
log.info(
f"[Guild {guild.id}] Couldn't reinvite member {member} "
f"(ID: {member.id}) after its temporary ban."
)
now = datetime.utcnow()
for guild in self.bot.guilds:
data = await self.cache.get_temp_action(guild)
if not data:
continue
to_remove = []
for member_id, action in data.items():
member_id = int(member_id)
try:
taken_on = self._get_datetime(action["time"])
duration = self._get_timedelta(action["duration"])
except ValueError as e:
log.error(
f"[Guild {guild.id}] Time or duration cannot be fetched. This is "
"probably leftovers from the conversion of post 1.3 data. Removing the "
f"temp warning, not taking actions... Member: {member_id}, data: {action}",
exc_info=e,
)
to_remove.append(UnavailableMember(self.bot, guild._state, member_id))
continue
author = guild.get_member(action["author"])
member = guild.get_member(member_id)
case_reason = action["reason"]
level = action["level"]
action_str = _("mute") if level == 2 else _("ban")
if not member:
member = UnavailableMember(self.bot, guild._state, member_id)
if level == 2:
to_remove.append(member)
continue
roles = list(filter(None, [guild.get_role(x) for x in action.get("roles") or []]))
reason = _(
"End of timed {action} of {member} requested by {author} that lasted "
"for {time}. Reason of the {action}: {reason}"
).format(
action=action_str,
member=member,
author=author if author else action["author"],
time=self._format_timedelta(duration),
reason=case_reason,
)
if (taken_on + duration) < now:
# end of warn
try:
if level == 2:
await self._unmute(member, reason=reason, old_roles=roles)
if level == 5:
await guild.unban(member, reason=reason)
if await self.data.guild(guild).reinvite():
await reinvite(
guild,
member,
case_reason,
self._format_timedelta(timedelta(seconds=action["duration"])),
)
except discord.errors.Forbidden:
log.warn(
f"[Guild {guild.id}] I lost required permissions for "
f"ending the timed {action_str}. Member {member} (ID: {member.id}) "
"will stay as it is now."
)
except discord.errors.HTTPException as e:
log.warn(
f"[Guild {guild.id}] Couldn't end the timed {action_str} of {member} "
f"(ID: {member.id}). He will stay as it is now.",
exc_info=e,
)
else:
log.debug(
f"[Guild {guild.id}] Ended timed {action_str} of {member} (ID: "
f"{member.id}) taken on {self._format_datetime(taken_on)} requested "
f"by {author} (ID: {author.id}) that lasted for "
f"{self._format_timedelta(duration)} for the reason {case_reason}"
f"\nCurrent time: {now}\nExpected end time of warn: "
f"{self._format_datetime(taken_on + duration)}"
)
to_remove.append(member)
if to_remove:
await self.cache.bulk_remove_temp_action(guild, to_remove)
async def _loop_task(self):
"""
This is an infinite loop task started with the cog that will check\
if a temporary warn (mute or ban) is over, and cancel the action if it's true.
The loop runs every 10 seconds.
"""
await self.bot.wait_until_ready()
log.debug(
"Starting infinite loop for unmutes and unbans. Canel the "
'task with bot.get_cog("WarnSystem").task.cancel()'
)
errors = 0
while True:
try:
await self._check_endwarn()
except Exception as e:
errors += 1
if errors >= 3:
# more than 3 errors in our loop, let's shut down the loop
log.critical(
"The loop for unmutes and unbans encountered a third error. To prevent "
"more damages, the loop will be cancelled. Timed mutes and bans no longer "
"works for now. Reload the cog to start the loop back. If the problem "
"persists, report the error and update the cog.",
exc_info=e,
)
return
log.error(
"Error in loop for unmutes and unbans. The loop will be resumed.", exc_info=e
)
await asyncio.sleep(10)
# automod stuff
[docs] def enable_automod(self):
"""
Enable automod checks and listeners on the bot.
"""
log.info("Enabling automod listeners and event loops.")
self.bot.add_listener(self.automod_on_message, name="on_message")
self.automod_warn_task = self.bot.loop.create_task(self.automod_warn_loop())
[docs] def disable_automod(self):
"""
Disable automod checks and listeners on the bot.
"""
log.info("Disabling automod listeners and event loops.")
self.bot.remove_listener(self.automod_on_message, name="on_message")
if hasattr(self, "automod_warn_task"):
self.automod_warn_task.cancel()
async def _check_if_automod_valid(self, message: discord.Message):
guild = message.guild
member = message.author
if not guild:
return False
if member.bot:
return False
if guild.owner_id == member.id:
return False
if not self.cache.is_automod_enabled(guild):
return False
if await self.bot.is_automod_immune(message):
return False
if await self.bot.is_mod(member):
return False
return True
async def automod_on_message(self, message: discord.Message):
if not await self._check_if_automod_valid(message):
return
# we run all tasks concurrently
# results are returned in the same order (either None or an exception)
regex_exception, antispam_exception = await asyncio.gather(
self.automod_process_regex(message),
self.automod_process_antispam(message),
return_exceptions=True,
)
if regex_exception:
log.error(
f"[Guild {message.guild.id}] Error while processing message for regex automod.",
exc_info=regex_exception,
)
if antispam_exception:
log.error(
f"[Guild {message.guild.id}] Error while processing message for antispam system.",
exc_info=antispam_exception,
)
async def automod_on_message_edit(self, before: discord.Message, after: discord.Message):
if not await self._check_if_automod_valid(after):
return
try:
await self.automod_process_regex(after)
except Exception as e:
log.error(
f"[Guild {after.guild.id}] Error while "
"processing edited message for regex automod.",
exc_info=e,
)
async def _safe_regex_search(self, regex: re.Pattern, message: discord.Message):
"""
Mostly safe regex search to prevent reDOS from user defined regex patterns
This works by running the regex pattern inside a process pool defined at the
cog level and then checking that process in the default executor to keep
things asynchronous. If the process takes too long to complete we log a
warning and remove the trigger from trying to run again.
This function was fully made by TrustyJAID for Trusty-cogs/retrigger (amazing cog btw)
https://github.com/TrustyJAID/Trusty-cogs/blob/f08a88040dcc67291a463517a70dcbbe702ba8e3/retrigger/triggerhandler.py#L494
"""
guild = message.guild
try:
process = self.re_pool.apply_async(regex.findall, (message.content,))
task = functools.partial(process.get, timeout=self.regex_timeout)
new_task = self.bot.loop.run_in_executor(None, task)
search = await asyncio.wait_for(new_task, timeout=self.regex_timeout + 5)
except TimeoutError:
error_msg = (
f"[Guild {guild.id}] Automod: regex process took too long. "
f"Removing from memory. Offending regex: {regex.pattern}"
)
log.warning(error_msg)
return (False, [])
# we certainly don't want to be performing multiple triggers if this happens
except asyncio.TimeoutError:
error_msg = (
f"[Guild {guild.id}] Automod: regex asyncio timed out. "
f"Removing from memory. Offending regex: {regex.pattern}"
)
log.warning(error_msg)
return (False, [])
except Exception:
log.error(
f"[Guild {guild.id}] Automod regex encountered an error with {regex.pattern}",
exc_info=True,
)
return (True, [])
else:
return (True, search)
async def automod_process_regex(self, message: discord.Message):
guild = message.guild
member = message.author
all_regex = await self.cache.get_automod_regex(guild)
for name, regex in all_regex.items():
result = await self._safe_regex_search(regex["regex"], message)
if not result[1]:
if result[0] is False:
await self.cache.remove_automod_regex(guild, name)
continue
time = None
if regex["time"]:
time = self._get_timedelta(regex["time"])
level = regex["level"]
reason = regex["reason"].format(guild=guild, channel=message.channel, member=member)
fail = await self.warn(guild, [member], guild.me, level, reason, time)
if fail:
log.warn(
f"[Guild {guild.id}] Regex automod warn on member {member} ({member.id})\n"
f"Level: {level}. Time: {time}. Reason: {reason}\n"
f"Original message: {message.content}\n"
f"Automatic warn failed due to the following exception:",
exc_info=fail[0],
)
else:
log.info(
f"[Guild {guild.id}] Regex automod warn on member {member} ({member.id})\n"
f"Level: {level}. Time: {time}. Reason: {reason}\n"
f"Original message: {message.content}"
)
async def automod_process_antispam(self, message: discord.Message):
# we store the data in self.antispam
# keys are as follow: GUILD_ID > CHANNEL_ID > MEMBER_ID = tuple
# tuple contains list timestamps of recent messages + check if the member was warned
# if the antispam is triggered once, we send a message in the chat (refered as text warn)
# if it's triggered a second time, an actual warn is given
guild = message.guild
channel = message.channel
member = message.author
antispam_data = await self.cache.get_automod_antispam(guild)
if antispam_data is False:
return
for word in antispam_data["whitelist"]:
if word in message.content:
return
# we slowly go across each key, if it doesn't exist, data is created then the
# function ends since there's no data to check
InitialData = namedtuple("InitialData", ["messages", "warned"])
data = InitialData(messages=[], warned=False)
try:
guild_data = self.antispam[guild.id]
except KeyError:
self.antispam[guild.id] = {channel.id: {member.id: data}}
return
else:
pass
try:
channel_data = guild_data[channel.id]
except KeyError:
self.antispam[guild.id][channel.id] = {member.id: data}
return
else:
del guild_data
try:
data = channel_data[member.id]
except KeyError:
pass
del channel_data
data.messages.append(message.created_at)
# now we've got our list of timestamps, just gotta clean the old ones
data = data._replace(
messages=self._automod_clean_old_messages(
antispam_data["delay"], message.created_at, data.messages
)
)
if len(data.messages) <= antispam_data["max_messages"]:
# antispam not triggered, we can exit now
self.antispam[guild.id][channel.id][member.id] = data
return
# at this point, user is considered to be spamming
# we cleanup their x last messages (max_messages + 1), then either send a text warn
# or perform an actual warnsystem warn (I'm confusing ik)
if (
data.warned is False
or (datetime.now() - data.warned).total_seconds()
< antispam_data["delay_before_action"]
):
bot_message = await channel.send(
_("{member} you're sending messages too fast!").format(member=member.mention),
delete_after=5,
)
data = InitialData(messages=[], warned=bot_message.created_at)
else:
# already warned once within delay_before_action, gotta take actions
warn_data = antispam_data["warn"]
warn_data["author"] = guild.me
if warn_data["time"]:
warn_data["time"] = self._get_timedelta(warn_data["time"])
try:
self.antispam_warn_queue[guild.id][member] = warn_data
except KeyError:
self.antispam_warn_queue[guild.id] = {member: warn_data}
# also reset the data
data = InitialData(messages=[], warned=message.created_at)
self.antispam[guild.id][channel.id][member.id] = data
def _automod_clean_old_messages(self, delay: int, current_time: datetime, messages: list):
"""
We don't keep messages older than the delay in the cache
"""
message: datetime
delta: timedelta
new_list = []
for message in messages:
delta = current_time - message
if delta.total_seconds() <= delay:
new_list.append(message)
return new_list
def _automod_clean_cache(
self, guild: discord.Guild, channel: discord.TextChannel, member: discord.Member
):
"""
We quickly end up with a dict filled with empty values, we gotta clean that.
"""
del self.automod[guild.id][channel.id][member.id]
if not self.automod[guild.id][channel.id]:
del self.automod[guild.id][channel.id]
if not self.automod[guild.id]:
del self.automod[guild.id]
[docs] async def automod_check_for_autowarn(
self, guild: discord.Guild, member: discord.Member, author: discord.Member, level: int
):
"""
Iterate through member's modlog, looking for possible automatic warns.
Level is the last warning's level, which will filter a lot of possible autowarns and,
therefore, save performances.
This can be a heavy call if there are a lot of possible autowarns and a long modlog.
"""
t = datetime.now()
try:
await self._automod_check_for_autowarn(guild, member, author, level)
except Exception as e:
log.error(f"[Guild {guild.id}] A problem occured with automod check.", exc_info=e)
time_taken: timedelta = datetime.now() - t
if time_taken.total_seconds() > 10 and guild.id not in self.warned_guilds:
self.warned_guilds.append(guild.id)
log.warning(
f"[Guild {guild.id}] Automod check took a long time! Time taken: {time_taken}\n"
"Try to reduce the amount of warns/autowarns or blame Laggron for poorly "
"written code (second option is preferred).\nThis warning will not show again "
"for this guild until reload."
)
async def _automod_check_for_autowarn(
self, guild: discord.Guild, member: discord.Member, author: discord.Member, level: int
):
"""
Prevents having to put this whole function into a try/except block.
"""
if not self.cache.is_automod_enabled(guild):
return
# starting the iteration through warnings can cost performances
# so we look for conditions that confirms the member cannot be affected by automod
if await self.bot.is_automod_immune(member):
return
warns = await self.get_all_cases(guild, member)
if len(warns) < 2:
return # autowarn can't be triggered with a single warning in the modlog
autowarns = await self.data.guild(guild).automod.warnings()
# remove all autowarns that are locked to a specific level
# where the last warning's level doesn't correspond
# also remove autowarns that are automod only if warn author isn't the bot
def is_autowarn_valid(warn):
if author.id != self.bot.user.id and warn["automod_only"]:
return False
return warn["level"] == 0 or warn["level"] == level
autowarns = list(filter(is_autowarn_valid, autowarns))
if not autowarns:
return # no autowarn to iterate through
for i, autowarn in enumerate(autowarns):
# prepare for iteration
autowarns[i]["count"] = 0
# if the condition is met (within the specified time? not an automatic warn?)
# we increase this value until reaching the given limit
time = autowarn["time"]
if time:
until = datetime.utcnow() - timedelta(seconds=time)
autowarns[i]["until"] = until
del time
found_warnings = {} # we fill this list with the valid autowarns, there can be more than 1
for warn in warns[::-1]:
to_remove = [] # list of autowarns to remove during the iteration (duration expired)
taken_on = datetime.fromtimestamp(warn["time"])
for i, autowarn in enumerate(autowarns):
try:
if autowarn["until"] >= taken_on:
to_remove.append(i)
continue
except KeyError:
pass
autowarns[i]["count"] += 1
if autowarns[i]["count"] == autowarn["number"]:
found_warnings[i] = autowarn["warn"]
if autowarns[i]["count"] > autowarn["number"]:
# value exceeded, no need to continue, it's already done for this one warn
to_remove.append(i)
del found_warnings[i]
for index in reversed(to_remove):
autowarns.pop(index)
if not autowarns:
# we could be out of autowarns to check after a certain time
# no need to continue the iteration
break
del to_remove, taken_on, autowarns
for i, warn in found_warnings.items():
try:
await self.warn(
guild,
members=[member],
author=guild.me,
level=warn["level"],
reason=warn["reason"],
time=self._get_timedelta(warn["duration"]) if warn["duration"] else None,
)
except Exception as e:
log.error(
f"[Guild {guild.id}] Failed to perform automod warn on member {member} "
f"({member.id}). Needed to perform automatic warn {i}",
exc_info=e,
)
else:
log.debug(
f"[Guild {guild.id}] Successfully performed automatic "
f"warn {i} on member {member} ({member.id})."
)
async def automod_warn_loop(self):
# since this is asyncronous code, sometimes there can be too many warnings performed
# especially with message antispam, since it treats multiple messages simultaneously
# instead, we have a dict of warnings to perform, and we remove it only once the warn is
# done. That way, duplicate warnings won't happen.
async def warn(member: discord.Member, data: dict):
guild = member.guild
try:
await self.warn(guild, [member], **data)
except Exception as e:
log.error(
f"Cannot perform autowarn on member {member} ({member.id}). Data: {data}",
exc_info=e,
)
finally:
await asyncio.sleep(1)
del self.antispam_warn_queue[guild.id][member]
if not self.antispam_warn_queue[guild.id]:
del self.antispam_warn_queue[guild.id]
async def loop():
guild: discord.Guild
member: discord.Member
coros = []
for guild_id, data in self.antispam_warn_queue.items():
for member, value in data.items():
coros.append(warn(member, value))
coros = asyncio.gather(*coros, return_exceptions=True)
await coros
errors = 0
while True:
try:
await loop()
except Exception as e:
errors += 1
if errors >= 3:
# more than 3 errors in our loop, let's shut down the loop
log.critical(
"The loop for automod warnings encountered a third error. To prevent "
"more damages, the loop will be cancelled. Timed mutes and bans no longer "
"works for now. Reload the cog to start the loop back. If the problem "
"persists, report the error and update the cog.",
exc_info=e,
)
return
log.error(
"Error in loop for automod warnings. The loop will be resumed.", exc_info=e
)
await asyncio.sleep(1)