-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
144 lines (126 loc) · 4.79 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os, nextcord, csv
from nextcord.ext import commands, tasks
from datetime import datetime
import motor.motor_asyncio as moto
import dotenv
from utils.mongo_utils import (
get_collection,
add_user,
get_user,
attendance_add,
attendance_remove,
get_all_attendance_prns,
)
from events.socrative_attendance import mark_attendance
current_time = datetime.now()
formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
# token and discord uri using dotenv
dotenv.load_dotenv()
token = os.getenv("DISCORD_TOKEN")
mongo_uri = os.getenv("MONGO_URI")
attendance_channel = os.getenv("ATTENDANCE_CHANNEL")
attendance_removal_channel = os.getenv("ATTENDANCE_REMOVAL_CHANNEL")
announcement_channel = int(os.getenv("ANNOUNCEMENT_CHANNEL"))
class Chotu(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.my_task.start()
def cog_unload(self):
self.my_task.cancel()
@tasks.loop(seconds=59)
async def my_task(self):
now = datetime.now()
with open("./config/schedule.csv", "r") as f:
reader = csv.DictReader(f)
for row in reader:
schedule_time = datetime.strptime(row["time"], "%H:%M")
if (
row["day"].lower() == now.strftime("%A").lower()
and schedule_time.hour == now.hour
and schedule_time.minute == now.minute
):
# Your code to do something goes here
print(f"It's time for room {row['room']}!")
channel = self.bot.get_channel(announcement_channel)
await channel.send(f"It's time for room {row['room']}!")
await mark_attendance(row["room"])
# creaing an event , when message in a channel
@commands.Cog.listener()
async def on_message(self, message):
if message.author == self.bot.user:
return
if message.channel.id == int(attendance_channel):
user = message.author.id
if await get_user(user) == None:
await message.channel.send(
f"Hey {message.author.mention}! You are not registered with me. Please register yourself using `/register` command."
)
else:
await attendance_add(user)
await message.author.send(
f"Hey {message.author.mention}! Your attendance has been marked for the day."
)
elif message.channel.id == int(attendance_removal_channel):
user = message.author.id
if await get_user(user) == None:
await message.channel.send(
f"Hey {message.author.mention}! You are not registered with me. Please register yourself using `/register` command."
)
else:
await attendance_remove(user)
await message.author.send(
f"Hey {message.author.mention}! Your attendance has been removed for the day."
)
# Greetings
@commands.Cog.listener()
async def on_ready(self):
await bot.change_presence(
activity=nextcord.Activity(
type=nextcord.ActivityType.watching, name="Kya dekh Raha he bsdk"
)
)
print("\n\tLogged in as...\t")
print(
f"\n> NAME: {self.bot.user} \n> ID : {self.bot.user.id}\n> AT TIME : {formatted_time}"
)
# Reconnect
@commands.Cog.listener()
async def on_resumed(self):
print("Bot has reconnected!")
# Error Handlers
@commands.Cog.listener()
async def on_command_error(self, ctx, error):
# Bot does not have permission
if isinstance(error, commands.CommandNotFound):
await ctx.send("Invalid Command!")
# Bot does not have permission
elif isinstance(error, commands.MissingPermissions):
await ctx.send("Bot Permission Missing!")
intents = nextcord.Intents.default()
intents.members = True
intents.presences = True
intents.message_content = True
bot = commands.Bot(
command_prefix=commands.when_mentioned_or("!"),
description="Chotu",
intents=intents,
)
# connect to mongo db database and load all cogs
if __name__ == "__main__":
# Load extension
os.system("clear")
# connecting to mongo db
print("Connecting to MongoDB...")
try:
bot.mongo = moto.AsyncIOMotorClient(mongo_uri)
bot.db = bot.mongo["Chotu"]
bot.db["students"]
print("Connected to MongoDB!")
except Exception as e:
print("Unable to connect to MongoDB!")
print(e)
for filename in os.listdir("commands"):
if filename.endswith(".py"):
bot.load_extension(f"commands.{filename[: -3]}")
bot.add_cog(Chotu(bot))
bot.run(token, reconnect=True)