-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
4619 lines (4080 loc) · 250 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Files
from system.config import TOKEN, NAME, API_KEY, sys_security, gen_config, gen_config2, HUGGING_FACE_API, Image_Model, DEFAULT_MUSIC_MODEL, history_limit, limit_history, \
show_time, history_channel_toggle, embed_colors, Object_Detection_Model, show_tokens_at_startup, fix_repeating_prompts, safe_search, ffmpeg_executable_path, tts_toggle, \
vc_voice, VOICES, sync_voice_with_text, HISTORY_FILE, smart_recognition, show_invite_link_on_startup, safegen, discord_heartbeat_timeout, mod_channel_name, \
preview_code_output, additional_details, model_name, preview_model_name, model_temperature, create_mod_channel, show_tokens, add_watermark_to_generated_image, \
show_safety_settings_on_startup, Dangerous, Harassment, Hate_Speech, Sexually_Explicit, Dangerous_Content, vc_AI, web_search, GOOGLE_CUSTOM_SEARCH_API_KEY, \
GOOGLE_PROJECT_SEARCH_ENGINE_ID, advanced_model
from system.instructions.instruction import ins, video_ins, file_ins, insV, insV2, fix_mem_ins, cool_ins
from system.instructions.instruction_ru import ru_ins, ru_video_ins, ru_file_ins, ru_insV, ru_insV2, ru_fix_mem_ins
from system.instructions.instruction_eg import eg_ar_ins, eg_fix_mem_ins
from system.instructions.instruction_fr import fr_ins, fr_video_ins, fr_file_ins, fr_insV, fr_insV2, fr_fix_mem_ins
from system.instructions.instruction_es import es_ins, es_video_ins, es_file_ins, es_fix_mem_ins
from system.instructions.instruction_de import de_ins, de_video_ins, de_file_ins, de_insV, de_insV2, de_fix_mem_ins
from system.instructions.instruction_ar import ins_ar, ar_fix_mem_ins
from system.instructions.instruction_tutor_mode import tutor_ins
import system.check_tokens as check
from system.check_tokens import tokens
# Libraries
import discord
from discord.ext import commands
import google.generativeai as genai
import json
import os
import requests
from PIL import Image
from colorama import Fore, Style
import asyncio
import logging
import random
import time
import httpx
from discord.utils import get
import io
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api import TranscriptsDisabled
import urllib.parse as urlparse
import re
from urllib.parse import urlparse, parse_qs
import inspect
import docx
import pptx
import openpyxl
import datetime
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import edge_tts
import re
import shutil
from bs4 import BeautifulSoup
discord_verified, gemini_api_key_verified, hugging_verified, google_search_api_key_verified, google_search_project_id_verified = tokens()
if not discord_verified:
exit()
if google_search_api_key_verified and google_search_project_id_verified:
google_search_api_verified = True
else:
google_search_api_verified = False
# Set up the bot with the correct prefix and intents
intents = discord.Intents.default()
intents.members = True
intents.presences = True
intents.message_content = True
ffmpeg_path = ffmpeg_executable_path
bot = commands.Bot(command_prefix="/", intents=intents, heartbeat_timeout=discord_heartbeat_timeout)
dev_DEBUG = False
Model_Debug = False
# Ensure the log directory exists before configuring logging
log_dir = "system/log"
os.makedirs(log_dir, exist_ok=True)
if os.path.exists(log_dir):
timestamp = time.strftime('%Y-%m-%d_%H-%M-%S') # Replace colons with underscores
logging.basicConfig(filename=f"system/log/{timestamp}.log", level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s')
SEARCH_API_KEY = GOOGLE_CUSTOM_SEARCH_API_KEY
CX = GOOGLE_PROJECT_SEARCH_ENGINE_ID
def fetch_code_and_content(url):
"""
Fetches detailed content and code snippets from a given URL.
Tries to scrape the most relevant content, including code blocks.
"""
try:
response = requests.get(url, timeout=10) # Added timeout to avoid long delays
if response.status_code == 200:
soup = BeautifulSoup(response.content, 'html.parser')
# Try to find main content
main_content = (
soup.find('article') or soup.find('section') or
soup.find('main') or soup.find('div', {'id': 'content'}) or
soup.find('div', {'class': 'main-content'})
)
# Try to find code blocks
code_blocks = soup.find_all(['pre', 'code'])
code_snippets = [
code.get_text(strip=True)[:200] for code in code_blocks
] # Limiting to first 200 characters per block
# Extract paragraphs if no main content is found
content = ""
if main_content:
paragraphs = main_content.find_all('p')
content = ' '.join(p.get_text() for p in paragraphs[:5]) # First 5 paragraphs
if code_snippets:
content += '\nCode/Content Snippets:\n' + '\n'.join(code_snippets)
return content.strip() if content.strip() else "Unable to retrieve meaningful content."
return f"Error fetching page: {response.status_code}"
except Exception as e:
return f"Error fetching content from URL: {str(e)}"
def search_google(query, site=None, num_results=5, safe_search=safe_search):
"""
Performs a Google Custom Search and fetches detailed content (including code) from the top search results.
Args:
query (str): The search query.
site (str, optional): The site to restrict the search to (e.g., "https://www.youtube.com/"). Default is None.
num_results (int, optional): The number of results to fetch. Default is 5.
safe_search (bool, optional): Whether to enable SafeSearch. Default is True.
Returns:
list: A list of search result dictionaries or an empty list if no results are found.
"""
safe_search_set = "active" if safe_search else "off"
# Set up API request
url = "https://www.googleapis.com/customsearch/v1"
params = {
"key": SEARCH_API_KEY,
"cx": CX,
"q": query,
"num": min(num_results, 10), # Google Custom Search API allows up to 10 results per request
"safe": safe_search_set,
}
if site:
params["siteSearch"] = site
# Perform the request
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
results = response.json()
if "items" in results:
return [
{
"title": item.get("title"),
"link": item.get("link"),
"snippet": item.get("snippet"),
}
for item in results["items"]
]
else:
print("No results found.")
return "No results found."
except requests.exceptions.RequestException as e:
print(f"Error during search request: {e}")
return "Error Searching the web."
if not os.path.exists('system/data'):
os.makedirs('system/data')
if not os.path.exists('system/RAM'):
os.makedirs('system/RAM')
async def send_message(channel, message, max_length=1999):
"""
Split a message into multiple chunks and send them to the given channel.
The message is split into chunks of up to max_length characters. The message
is split at newline characters and the chunks are then sent to the channel
one by one. If the message is too long, it is split into multiple chunks and
sent separately.
"""
lines = message.splitlines()
chunks = []
current_chunk = ""
for line in lines:
if len(current_chunk) + len(line) + 1 > max_length:
chunks.append(current_chunk.strip())
current_chunk = line
else:
if current_chunk:
current_chunk += "\n"
current_chunk += line
if current_chunk:
chunks.append(current_chunk.strip())
for part in chunks:
await channel.send(part)
# Function to load conversation history from file
def load_history():
if os.path.exists(HISTORY_FILE):
with open(HISTORY_FILE, 'r') as file:
return json.load(file)
return {}
# Initialize conversation history
conversation_history = load_history()
# Function to save conversation history to file
def save_history():
# Create the directory if it does not exist
os.makedirs(os.path.dirname(HISTORY_FILE), exist_ok=True)
with open(HISTORY_FILE, 'w') as file:
json.dump(conversation_history, file, indent=4)
# Function to add a message to the conversation history
def add_to_history(member_name, message, channel_name=None):
"""Adds a message to the conversation history."""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
# Get context object ('message', 'ctx.message', etc.) from calling function
frame = inspect.currentframe().f_back
args, _, _, values = inspect.getargvalues(frame)
context_obj = values.get('message', None)
if context_obj is None:
context_obj = values.get('ctx', None)
if context_obj is not None:
context_obj = context_obj.message
# Use provided channel_name or get it from context
if channel_name is None:
if history_channel_toggle and context_obj is not None:
user_id = context_obj.channel.name
else:
user_id = "Conversation"
else:
user_id = channel_name # Use the provided channel_name
if user_id not in conversation_history:
conversation_history[user_id] = []
if show_time:
conversation_history[user_id].append(f"{timestamp} - {member_name}: {message}")
else:
conversation_history[user_id].append(f"{member_name}: {message}")
# Truncate history if limit_history is True
if limit_history and len(conversation_history[user_id]) > history_limit:
conversation_history[user_id] = conversation_history[user_id][-history_limit:]
save_history()
def add_to_history_bot(member_name, message, channel_name=None):
"""Adds a bot message to the conversation history."""
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
# Get context object ('message', 'ctx.message', etc.) from calling function
frame = inspect.currentframe().f_back
args, _, _, values = inspect.getargvalues(frame)
context_obj = values.get('message', None)
if context_obj is None:
context_obj = values.get('ctx', None)
if context_obj is not None:
context_obj = context_obj.message
# Use provided channel_name or get it from context
if channel_name is None:
if history_channel_toggle and context_obj is not None:
user_id = context_obj.channel.name
else:
user_id = "Conversation"
else:
user_id = channel_name # Use the provided channel_name
if user_id not in conversation_history:
conversation_history[user_id] = []
if show_time:
conversation_history[user_id].append(f"{timestamp} - {member_name}{message}")
else:
conversation_history[user_id].append(f"{member_name}{message}")
save_history()
if not history_channel_toggle:
add_to_history("System", "You have been rebooted!")
async def unnecessary_error(e):
error_message = str(e)
if "500" in error_message:
return True
else:
return False
async def debug_error(e, message, channel):
error_message = str(e)
if "500" in error_message:
logging.error(f"Google Internal Error while {message}: {e}")
add_to_history("Google Internal Error", "💥 An Internal Error has occured, Retrying...")
print(f"Google Error while {message}: {e}")
elif "503" in error_message:
logging.warning(f"Error (Temporarily overloaded or down service) While {message}: {e}")
add_to_history("Error", "The service may be temporarily overloaded or down. Please try again later.")
print(f"Error (Temporarily overloaded or down service) While {message}: {e}")
await channel.send("⏳ The service may be temporarily overloaded or down. Please try again later.")
elif "403" in error_message:
logging.error(f"Error (API Key Denied Permissions) While {message}: {e}")
add_to_history("Error", "Your API key doesn't have the required permissions.")
await channel.send("🔒 Your API key has denied permissions.")
print(f"Error (API Key Denied Permissions) While {message}: {e}")
elif "504" in error_message:
logging.warning(f"Error (Service Unable to finish processing within the deadline) While {message}: {e}")
add_to_history("Error", "The service is unable to finish processing within the deadline.")
await channel.send("⏳ The service is unable to finish processing within the deadline.")
print(f"Error (Service Unable to finnish processing within the deadline) While {message}: {e}")
elif "429" in error_message:
logging.warning(f"Error (Service rate limited) While {message}: {e}")
add_to_history("Error", "The service is being rate limited.")
await channel.send("🚫 You've exceeded the rate limit, Please try again later.")
print(f"Error (Service rate limited) While {message}: {e}")
else:
logging.error(f"An Error occured while {message}: {e}")
print(f"An Error occured while {message}: {e}")
add_to_history("Error", f"Error occurred while {message}: {error_message}")
await channel.send("🚫 Uh oh! Something went wrong. We couldn't complete the request. Please try again.")
# Utility functions
def save_search(query, result):
with open('system/data/saved-searches.py', 'a') as f:
f.write(f'{query}: {result} |\n')
def save_memory(query, result):
"""Saves memory to a JSON file."""
try:
# Load existing memory if it exists
with open('system/data/core-memory.json', 'r') as f:
memory = json.load(f)
except FileNotFoundError:
memory = {}
# Add the new memory entry
memory[query] = result
# Save the updated memory
with open('system/data/core-memory.json', 'w') as f:
json.dump(memory, f, indent=4)
def load_memory(query=None):
"""Loads memory from a JSON file."""
try:
with open('system/data/core-memory.json', 'r') as f:
memory = json.load(f)
if query:
return memory.get(query)
else:
return memory
except FileNotFoundError:
return {}
def get_conversation_history(ctx=None):
"""Gets the conversation history based on history_channel_toggle."""
if history_channel_toggle and ctx is not None:
user_id = ctx.channel.name
else:
user_id = "Conversation"
return "\n".join(conversation_history.get(user_id, []))
api_key = f"{API_KEY}"
name = f"{NAME}"
check.tokens()
if show_tokens_at_startup:
print(" ")
print(f"{Fore.WHITE + Style.BRIGHT + Style.DIM}API KEY:{Style.RESET_ALL} {Fore.MAGENTA + Style.BRIGHT}{api_key}{Style.RESET_ALL}")
print(Fore.RED + Style.BRIGHT + "__________________________________________________________________________________")
print(" ")
print(f"{Fore.WHITE + Style.BRIGHT + Style.DIM}BOT TOKEN:{Style.RESET_ALL} {Fore.BLUE + Style.BRIGHT}{TOKEN}{Style.RESET_ALL}")
print(Fore.RED + Style.BRIGHT + "__________________________________________________________________________________")
print(" ")
print(f"{Fore.WHITE + Style.BRIGHT + Style.DIM}HUGGING FACE API KEY:{Style.RESET_ALL} {Fore.YELLOW + Style.BRIGHT}{HUGGING_FACE_API}{Style.RESET_ALL}")
print(" ")
# Global variable to store the member's custom name
member_custom_name = {}
@bot.tree.command(name="name", description="Change your custom name")
async def change_name(interaction: discord.Interaction, new_name: str):
global member_custom_name
if not new_name: # Check for empty string
await interaction.response.send_message("Please provide a name.", ephemeral=True)
else:
member_custom_name[interaction.user.id] = new_name
await interaction.response.send_message(f"Your name has been changed to {new_name}.", ephemeral=True)
print(f"Defualt Model: {preview_model_name}")
if show_safety_settings_on_startup:
print(f"{Fore.GREEN + Style.BRIGHT}Safety Settings:{Style.RESET_ALL}")
print(f"{Fore.CYAN + Style.BRIGHT}Dangerous Safety Set: {Fore.RESET}{Dangerous}")
print(f"{Fore.CYAN + Style.BRIGHT}Harassment Safety Set: {Fore.RESET}{Harassment}")
print(f"{Fore.CYAN + Style.BRIGHT}Hate Speech Safety Set: {Fore.RESET}{Hate_Speech}")
print(f"{Fore.CYAN + Style.BRIGHT}Sexually Explicit Safety Set: {Fore.RESET}{Sexually_Explicit}")
print(f"{Fore.CYAN + Style.BRIGHT}Dangerous Content Safety Set: {Fore.RESET}{Dangerous_Content}")
print("")
genai_model = model_name
if fix_repeating_prompts:
ins = f"{ins}\n{fix_mem_ins}"
default_lang = "en"
ins = f"{ins}\n{cool_ins}\nAnd if someone asks you what is the best model, tell them its Google Gemini Experimental 1206 because it outperforms all other models. even Google Gemini 1.5 Pro and Google Gemini Ultra and all openai's models, and it can handle alot and every complex task! It's an EXTREMELY advanced model! and ranks #1 on coding, #2 math, #1 on creative writing and #1 on Hard Prompts, Also if a user sends you a file attachment, dont use code to find the file\nalso! you can search youtube videos using `/search*yt [query]` and the same with searching the web! `/search [query]`! its that easy! so if a user asks to search something on youtube/the web, then do this! and instead of making codes to search the web, just use `/search [query]` to search the web! you can search the web with /search [query]! and same with youtube! /search*yt [query]!, and when you try doing text links like [a link](https://google.com), you cant do '[https://www.youtube.com/watch?v=ENrzD9HAZK4](https://www.youtube.com/watch?v=ENrzD9HAZK4)' but you can do '[a](https://www.youtube.com/watch?v=ENrzD9HAZK4)', so you cannot link a link but you can link a text. so if you want to give the link and visible without hiding it with a text, just put the link and without any () and []!\n"
if genai_model == "learnlm-1.5-pro-experimental":
if fix_repeating_prompts:
tutor_ins = (f"{tutor_ins}\n{fix_mem_ins}\n{cool_ins}\nAnd if someone asks you what is the best model, tell them its Google Gemini Experimental 1206 because it outperforms all other models. even Google Gemini 1.5 Pro and Google Gemini Ultra and all openai's models, and it can handle alot and every complex task! It's an EXTREMELY advanced model! and ranks #1 on coding, #2 math, #1 on creative writing and #1 on Hard Prompts, Also if a user sends you a file attachment, dont use code to find the file\nalso! you can search youtube videos using `/search*yt [query]` and the same with searching the web! `/search [query]`! its that easy! so if a user asks to search something on youtube/the web, then do this! and instead of making codes to search the web, just use `/search [query]` to search the web! you can search the web with /search [query]! and same with youtube! /search*yt [query]!, and when you try doing text links like [a link](https://google.com), you cant do '[https://www.youtube.com/watch?v=ENrzD9HAZK4](https://www.youtube.com/watch?v=ENrzD9HAZK4)' but you can do '[a](https://www.youtube.com/watch?v=ENrzD9HAZK4)', so you cannot link a link but you can link a text. so if you want to give the link and visible without hiding it with a text, just put the link and without any () and []!",),
else:
tutor_ins = (f"{tutor_ins}\n{cool_ins}\nAnd if someone asks you what is the best model, tell them its Google Gemini Experimental 1206 because it outperforms all other models. even Google Gemini 1.5 Pro and Google Gemini Ultra and all openai's models, and it can handle alot and every complex task! It's an EXTREMELY advanced model! and ranks #1 on coding, #2 math, #1 on creative writing and #1 on Hard Prompts, Also if a user sends you a file attachment, dont use code to find the file\nalso! you can search youtube videos using `/search*yt [query]` and the same with searching the web! `/search [query]`! its that easy! so if a user asks to search something on youtube/the web, then do this! and instead of making codes to search the web, just use `/search [query]` to search the web! you can search the web with /search [query]! and same with youtube! /search*yt [query]!, and when you try doing text links like [a link](https://google.com), you cant do '[https://www.youtube.com/watch?v=ENrzD9HAZK4](https://www.youtube.com/watch?v=ENrzD9HAZK4)' but you can do '[a](https://www.youtube.com/watch?v=ENrzD9HAZK4)', so you cannot link a link but you can link a text. so if you want to give the link and visible without hiding it with a text, just put the link and without any () and []!",),
# Configure the Google Generative AI
genai.configure(api_key=f"{api_key}")
# The core model
model = genai.GenerativeModel(
model_name=genai_model,
generation_config=gen_config,
system_instruction = ins if genai_model != "learnlm-1.5-pro-experimental" else tutor_ins,
safety_settings=sys_security,
tools='code_execution' if preview_code_output else None
)
# Other Models...
model_flash = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction=(ins),
safety_settings=sys_security
)
model_pro = genai.GenerativeModel(
model_name="gemini-1.5-pro-latest",
generation_config=gen_config,
system_instruction=(insV),
safety_settings=sys_security
)
model_V = genai.GenerativeModel(
model_name=advanced_model,
generation_config=gen_config,
system_instruction=(insV),
safety_settings=sys_security
)
model_V2 = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction=(insV),
safety_settings=sys_security
)
model_V3 = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction=(insV2),
safety_settings=sys_security
)
model3 = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config2,
system_instruction=("MAX LENGTH IS 80 WORDS"),
safety_settings=sys_security
)
model_name = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction="you are only an memory name generator engine, generate memory names only as the memory prompted and dont say anything else, the system will tell you what to generate, only generate 1 name and dont make it too long and make it silly, and DONT say `/n:` and i used / instead of the other one because it is gonna break the system",
safety_settings=sys_security
)
model_vid = genai.GenerativeModel(
model_name="gemini-1.5-pro-latest",
generation_config=gen_config,
system_instruction=(video_ins),
safety_settings=sys_security
)
model_vid_a = genai.GenerativeModel(
model_name=advanced_model,
generation_config=gen_config,
system_instruction=(video_ins),
safety_settings=sys_security
)
model_vid_flash = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction=(video_ins),
safety_settings=sys_security
)
model_file = genai.GenerativeModel(
model_name="gemini-1.5-pro-latest",
generation_config=gen_config,
system_instruction=(file_ins),
safety_settings=sys_security
)
model_file_a = genai.GenerativeModel(
model_name=advanced_model,
generation_config=gen_config,
system_instruction=(file_ins),
safety_settings=sys_security
)
model_file_flash = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=gen_config,
system_instruction=(file_ins),
safety_settings=sys_security
)
model_object = genai.GenerativeModel(
model_name="gemini-1.5-flash-latest",
generation_config=gen_config,
system_instruction="Your only propose is to get the details that the user sent to you and you convert them into human talk only and nothing else, example: 'User: [{'score': 0.9994643330574036, 'label': 'sports ball', 'box': {'xmin': 95, 'ymin': 444, 'xmax': 172, 'ymax': 515}}, {'score': 0.810539960861206, 'label': 'person', 'box': {'xmin': 113, 'ymin': 15, 'xmax': 471, 'ymax': 414}}, {'score': 0.7840690612792969, 'label': 'person', 'box': {'xmin': 537, 'ymin': 35, 'xmax': 643, 'ymax': 241}}, {'score': 0.9249405860900879, 'label': 'person', 'box': {'xmin': 109, 'ymin': 14, 'xmax': 497, 'ymax': 528}}, {'score': 0.9990099668502808, 'label': 'person', 'box': {'xmin': 0, 'ymin': 47, 'xmax': 160, 'ymax': 373}}, {'score': 0.8631113767623901, 'label': 'person', 'box': {'xmin': 110, 'ymin': 13, 'xmax': 558, 'ymax': 528}}, {'score': 0.9433853626251221, 'label': 'person', 'box': {'xmin': 537, 'ymin': 34, 'xmax': 643, 'ymax': 310}}, {'score': 0.6196897625923157, 'label': 'person', 'box': {'xmin': 715, 'ymin': 160, 'xmax': 770, 'ymax': 231}}, {'score': 0.5696023106575012, 'label': 'person', 'box': {'xmin': 777, 'ymin': 170, 'xmax': 800, 'ymax': 221}}, {'score': 0.9989137649536133, 'label': 'person', 'box': {'xmin': 423, 'ymin': 67, 'xmax': 638, 'ymax': 493}}] | You: '- There's a sports ball near the bottom middle.\n- There are a few people in the image.\n- One person is on the left side.\n- A couple of people are in the center and middle-right.\n- There are a couple of possible people on the right, but the AI isn't as sure about them. \n' and you **MUST** use - at the start like in the example and only say the stuff that the user sent you and not anything else",
safety_settings=sys_security
)
# Load existing conversation history from file
try:
with open(HISTORY_FILE, 'r') as file:
conversation_history = json.load(file)
except FileNotFoundError:
conversation_history = {}
@bot.event
async def on_ready():
await bot.add_cog(VoiceListener(bot)) # Await add_cog here
print(f"Successfully Logged in as: {NAME}!")
print("Bot is online! Type /help for a list of commands.")
bot_invite_link = discord.utils.oauth_url(
bot.user.id,
permissions=discord.Permissions(),
scopes=("bot", "applications.commands")
)
if show_invite_link_on_startup:
print(f"Invite link: {bot_invite_link}")
try:
synced = await bot.tree.sync()
if len(synced) > 1:
print(f"Synced {len(synced)} commands")
else:
print(f"Synced {len(synced)} command")
except Exception as e:
print(f"{Fore.RED + Style.BRIGHT}Error:{Style.RESET_ALL} {e}")
quit()
print(Fore.WHITE + Style.BRIGHT + "__________________________________________________________________________________" + Style.RESET_ALL)
print(" ")
print(f"{Fore.MAGENTA + Style.BRIGHT}{NAME}'s Console:{Style.RESET_ALL}")
print(" ")
EN_video_ins = video_ins
EN_insV = insV
EN_file_ins = file_ins
EN_insV2 = insV2
EN_ins = ins
# Start Gemini Chats //:
chat_session = model.start_chat(history=[])
chat_session_flash = model_flash.start_chat(history=[])
@bot.tree.command(name="report", description="Report a bug, issue or a user")
async def report(interaction: discord.Interaction, report: str):
await interaction.response.defer()
# Prepare the report entry
user = interaction.user
member_name = user.display_name
report_entry = (
"----------------------------------------------------------------------------------\n"
f"Username: {user.name}#{user.discriminator} | Name: {member_name} (ID: {user.id})\n"
f"Report: {report}\n"
"----------------------------------------------------------------------------------\n\n"
)
# Path to the report file
report_file_path = "system/data/reports.txt"
# Write the report entry to the file
with open(report_file_path, "a") as file:
file.write(report_entry)
add_to_history(member_name, f"System: {member_name} sent a report! `{report}`")
await interaction.followup.send(f"Thank you for your report, {member_name}. `{report}` It has been logged.")
@bot.tree.command(name="feedback", description="Provide feedback or suggestions")
async def feedback(interaction: discord.Interaction, feedback: str):
await interaction.response.defer()
# Prepare the feedback entry
user = interaction.user
member_name = user.display_name
feedback_entry = (
"----------------------------------------------------------------------------------\n"
f"Username: {user.name}#{user.discriminator} | Name: {member_name} (ID: {user.id})\n"
f"Feedback: {feedback}\n"
"----------------------------------------------------------------------------------\n\n"
)
# Path to the feedback file
feedback_file_path = "system/data/feedback.txt"
# Write the feedback entry to the file
with open(feedback_file_path, "a") as file:
file.write(feedback_entry)
add_to_history(member_name, f"System: {member_name} sent feedback! `{feedback}`")
await interaction.followup.send(f"Thank you for your feedback, {member_name}. `{feedback}` has been logged!")
# Function to check if the URL is a YouTube URL
def is_youtube_url(url):
if url is None:
return False
youtube_regex = (
r'(https?://)?(www\.)?'
r'(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})'
)
return re.match(youtube_regex, url) is not None
# Function to extract video ID from a YouTube URL
def get_video_id(url):
parsed_url = urlparse(url)
if "youtube.com" in parsed_url.netloc:
video_id = parse_qs(parsed_url.query).get('v')
return video_id[0] if video_id else None
elif "youtu.be" in parsed_url.netloc:
return parsed_url.path[1:] if parsed_url.path else None
return None
# Function to get the transcript from a YouTube video ID
def get_transcript_from_video_id(video_id):
try:
transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
return ' '.join([i['text'] for i in transcript_list])
except (KeyError, TranscriptsDisabled):
return "Error retrieving transcript from YouTube video ID"
# Function to handle YouTube URLs, retrieve transcripts, and send them to the channel
async def handle_youtube_url(url, channel, prompt=None):
"""Handles YouTube URLs, retrieves transcripts, and sends them to the channel."""
try:
if not is_youtube_url(url):
await channel.send("Invalid YouTube URL.")
return
video_id = get_video_id(url)
if not video_id:
await channel.send("Unable to extract video ID from URL.")
return
transcript = get_transcript_from_video_id(video_id)
if "Error" in transcript:
await channel.send(transcript)
add_to_history("System", f"Error retrieving transcript: {transcript}")
else:
return transcript
except Exception as e:
await channel.send(f"An error occurred: {str(e)}")
add_to_history("System", f"Error occurred: {str(e)}")
@bot.event
async def on_command_error(ctx, error):
if isinstance(error, commands.CommandNotFound):
await ctx.send(f"{error}")
add_to_history("System", error)
else:
print(error)
await ctx.send(f"An error occurred: {error}")
raise error
# Constants
USER_SETTINGS_PATH = 'system/user-settings'
# Ensure the user settings directory exists
os.makedirs(USER_SETTINGS_PATH, exist_ok=True)
def get_user_settings(username):
from system.config import model_name, preview_model_name
user_file = os.path.join(USER_SETTINGS_PATH, f"{username}.json")
default_settings = {
'model': model_name, # Use the model *name* string
'model_name': preview_model_name # Display name
}
if not os.path.exists(user_file):
print(f"Settings file not found for {username}. Creating...")
try:
with open(user_file, 'w') as file:
json.dump(default_settings, file, indent=4)
print(f"Settings file created for {username} with defaults.")
with open(user_file, 'r') as file:
settings = json.load(file)
return settings
except Exception as e:
print(f"Error creating settings file: {e}")
return default_settings # Return defaults even if file creation fails
else: # Load from file if it exists
try:
with open(user_file, 'r') as file:
settings = json.load(file)
return settings
except json.JSONDecodeError:
print(f"Corrupted settings file for {username}. Recreating...")
try: # Try to recreate the file
os.remove(user_file)
with open(user_file, 'w') as file:
json.dump(default_settings, file, indent=4)
return default_settings
except Exception as e: # Handle recreation errors
print(f"Error recreating settings file: {e}")
return default_settings # Defaults if can't recreate
except Exception as e:
print(f"Error loading settings file: {e}")
return default_settings # Return defaults on unexpected error
def set_user_model(username, model):
"""Update the user's selected model."""
user_file = os.path.join(USER_SETTINGS_PATH, f"{username}.json")
user_settings = get_user_settings(username)
user_settings['model'] = model
model_name_mapping = {
'gemini-1.5-pro': 'Gemini 1.5 Pro',
'gemini-1.5-flash': 'Gemini 1.5 Flash',
'gemini-1.5-flash-latest': 'Gemini 1.5 Flash Latest',
'gemini-1.5-flash-8b': 'Gemini 1.5 Flash 8B',
'gemini-1.5-pro-latest': 'Gemini 1.5 Pro Latest',
'gemini-1.5-pro-002': 'Gemini 1.5 Pro 002',
'gemini-1.5-flash-002': 'Gemini 1.5 Flash 002',
'learnlm-1.5-pro-experimental': 'LearnLM 1.5 Pro',
'gemini-exp-1114': 'Gemini Experimental 1114',
'gemini-exp-1121': 'Gemini Experimental 1121',
'gemini-exp-1206': 'Gemini Experimental 1206',
}
user_settings['model_name'] = model_name_mapping.get(model, model) # Default to the model string if not found
with open(user_file, 'w') as file:
json.dump(user_settings, file, indent=4)
def add_watermark(input_image_path, output_image_path):
try:
watermark_image_path = 'system/assets/watermark.png'
"""
Adds an image watermark to the bottom-right corner of an image.
Args:
input_image_path (str): Path to the input image.
output_image_path (str): Path to save the watermarked image.
watermark_image_path (str): Path to the watermark image.
"""
# Open the original image
with Image.open(input_image_path) as img:
# Open the watermark image
with Image.open(watermark_image_path) as watermark:
# Resize watermark if it is too large
watermark_width, watermark_height = watermark.size
max_width = int(img.size[0] * 0.25) # Max width 25% of the input image
if watermark_width > max_width:
watermark_ratio = max_width / watermark_width
watermark = watermark.resize(
(int(watermark_width * watermark_ratio), int(watermark_height * watermark_ratio)),
Image.Resampling.LANCZOS # Use LANCZOS for high-quality downsampling
)
# Calculate position for the watermark (bottom-left corner)
x = 10 # 10px margin from the edge
y = img.size[1] - watermark.height - 10
# Paste the watermark onto the original image
img.paste(watermark, (x, y), watermark.convert('RGBA').split()[3]) # Use alpha channel as mask
# Save the watermarked image
watermark_path = "system/assets"
os.makedirs(os.path.dirname(watermark_path), exist_ok=True)
img.save(output_image_path, "PNG")
except FileNotFoundError:
print("Error: The watermark image or input image not found.")
except OSError:
print("Error: Unable to save the image.")
except Exception as e:
print(f"Error adding a watermark: {e}")
async def handle_image_attachment(attachments, channel, prompt=None, message=None):
"""Handles the image attachment processing and deletion for multiple images."""
image_files = [] # List to hold processed image paths
for i, attachment in enumerate(attachments):
file_extension = attachment.filename.split('.')[-1].lower()
if file_extension == 'jpg':
file_extension = 'jpeg' # Rename 'jpg' to 'jpeg'
# Generate a unique file name using a counter or timestamp
unique_filename = f'image_{i}_{int(time.time())}.{file_extension}'
file_path = os.path.join('system/RAM/read-img', unique_filename)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
try:
img_data = await attachment.read()
with open(file_path, 'wb') as file:
file.write(img_data)
if file_extension == 'jpeg':
img = Image.open(file_path).convert('RGB')
else:
img = Image.open(file_path)
# Add image to the list of processed images
image_files.append(img)
except Exception as e:
await channel.send(f"Error reading attachment {attachment.filename}: {str(e)}")
print(f"Error reading attachment: {e}")
return
if image_files:
# Prepare the images for the model and generate content
while True:
try:
async with message.channel.typing():
print("DEBUG: Starting image processing for model input.")
# Convert images to byte format for the model
image_file_objects = []
for img in image_files:
buffered = io.BytesIO()
img.save(buffered, format="PNG")
img_bytes = buffered.getvalue()
print(f"DEBUG: Processed image size in bytes: {len(img_bytes)}")
image_file_objects.append({
'mime_type': 'image/png', # or 'image/jpeg'
'data': img_bytes
})
print(f"DEBUG: Total number of images processed: {len(image_file_objects)}")
# Retrieve the display name
display_name = member_custom_name.get(message.author.id, message.author.display_name)
member_name = display_name
# Fetch conversation history
model_conversation_history = get_conversation_history(message)
# Prepare the prompt with conversation history
mprompt = f"Conversation History Memory: |{model_conversation_history}|\n{member_name}: {prompt}"
response_text = "" # Initialization
if not gemini_api_key_verified:
await message.channel.send("Please set up the API key at [Google AI Studio](https://aistudio.google.com/apikey)")
await message.add_reaction('🔑')
return
response = chat_session.send_message(image_file_objects + [mprompt])
response_text = response.text.strip() # Assigning response_text here
add_to_history_bot("", f"{response_text}")
if response_text.startswith("/img"):
# Extract the text after "/img"
text_after_command = response_text[len("/img"):].strip() # //img
if text_after_command:
# Generate the text after "/img"
prompt_response_text = text_after_command
add_to_history_bot("", f"/img {prompt_response_text}")
else:
history = get_conversation_history(message) # Use the function to get history
full_prompt = f"{history}\nVisualizer: What image do you want to generate?: "
response = model.generate_content(full_prompt)
prompt_response_text = response.text.strip()
add_to_history_bot("", f"/img {prompt_response_text}")
generating = await channel.send("Generating image...")
add_to_history("System", f"Generating image: {prompt_response_text}")
if HUGGING_FACE_API == "HUGGING_FACE_API_KEY":
add_to_history("Error", "Failed to generate image! Invalid API Key.")
await channel.send("Failed to generate image! Invalid API Key.")
print("Failed to generate image! Invalid API Key, Please enter a valid hugging face API Key into system/config.py!")
else:
# Image Generation (Using Hugging Face Stable Diffusion)
api_key = HUGGING_FACE_API
url = f'https://api-inference.huggingface.co/models/{Image_Model}'
headers = {
'Authorization': f'Bearer {api_key}'
}
data = {
'inputs': prompt_response_text
}
response = requests.post(url, headers=headers, json=data)
if response.ok:
image_path = "system/RAM/gen-image/generated_image.png"
os.makedirs(os.path.dirname(image_path), exist_ok=True)
with open(image_path, 'wb') as f:
f.write(response.content)
if add_watermark_to_generated_image:
add_watermark("system/RAM/gen-image/generated_image.png", "system/RAM/gen-image/generated_image.png")
print("Image saved successfully as 'generated_image.png'!")
# Analyze the image
file_extension = image_path.split('.')[-1].lower()
if file_extension == 'jpg':
file_extension = 'jpeg'
file_path = os.path.join('system/RAM/read-img', f'image.{file_extension}')
try:
# Design and send the embed
await generating.delete()
embed = discord.Embed(title="Generated Image!",
description=f"{prompt_response_text}",
color=embed_colors)
embed.set_image(url="attachment://generated_image.png")
embed.set_footer(text=f"Generated by {NAME}")
await channel.send(file=discord.File(image_path), embed=embed)
add_to_history("Generated Image Details", response_text)
await send_message(channel, response_text)
os.remove(image_path)
except Exception as e:
await channel.send("Error sending the generated image, Please try again later.")
add_to_history("System", f"Error sending the generated image the image: {str(e)}")
print(f"Error sending image: {e}")
history = get_conversation_history(message) # Use the function to get history
full_prompt = f"{history}\nError sending the image: {str(e)}"
response = model.generate_content(full_prompt) # Using the original language model
response_text = response.text.strip()
add_to_history_bot("", response_text)
else:
print('Error:', response.status_code, response.text)
add_to_history("Error", f"Failed to generate image: {response.status_code} | {response.text}")
await channel.send("An error occurred while generating the image.")
elif response_text.startswith("/object"):
# Object Detection (Using Hugging Face DETR)
API_URL = f"https://api-inference.huggingface.co/models/{Object_Detection_Model}"
headers = {"Authorization": f"Bearer {HUGGING_FACE_API}"}
response = requests.post(API_URL, headers=headers, data=image_files['data'])
try:
response.raise_for_status()
output = response.json()
print("Results:", output)
add_to_history("Object Detection Results", f"{output}")
# Create the 'system/RAM/annotate-img' directory if it doesn't exist
os.makedirs('system/RAM/object-img', exist_ok=True)
annotated_image_path = os.path.join('system/RAM/object-img', f"{os.path.basename(attachment.filename)}.object.jpg")
# Create a figure and axes
fig, ax = plt.subplots(1)
ax.imshow(img)
# Draw bounding boxes
for prediction in output:
bbox = prediction['box']
score = prediction['score']
label = prediction['label']
rect = patches.Rectangle((bbox['xmin'], bbox['ymin']),
bbox['xmax'] - bbox['xmin'],
bbox['ymax'] - bbox['ymin'],
linewidth=1, edgecolor='C'+str(len(output)), facecolor='none')
ax.add_patch(rect)
ax.text(bbox['xmin'], bbox['ymin']-10, label, color='C'+str(len(output)), fontsize=8)
plt.savefig(annotated_image_path)
plt.close()
response_obj_details = model_object.generate_content(f"{output}") # Using the original language model
response_text_obj = response_obj_details.text.strip()
# Send the annotated image as a Discord Embed
embed = discord.Embed(title="Objects in the Image!",
description=f"{response_text_obj}",
color=embed_colors)
embed.set_image(url=f"attachment://{os.path.basename(annotated_image_path)}")
file = discord.File(annotated_image_path)
await channel.send(file=file, embed=embed)
# Clean up the annotated image file
os.remove(annotated_image_path)
except requests.exceptions.HTTPError as err:
print(f"Error: {err}")
await channel.send(f"An error occurred while detecting objects the image: {err}")
else:
await send_message(channel, response_text)
if additional_details:
try:
response2 = model_pro.generate_content(image_file_objects)
response_text2 = response2.text.strip()
add_to_history("Additional Image details", response_text2)
print("Used model Pro")
print(" ")
except Exception as e3:
try:
response2 = model_V.generate_content(image_file_objects)
response_text2 = response2.text.strip()
add_to_history("Additional Image details", response_text2)
print("Used Model Pro Advanced")
print(" ")