-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmultisnake.pyw
483 lines (459 loc) · 16.2 KB
/
multisnake.pyw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
#low-level
import random
from os import getpid
import asyncio
import sys
import time
import json
#mid-level
from hashlib import sha256
from contextlib import suppress
#3rd-party
from pypresence import Client as shush, AioClient as Client, InvalidPipe
import websockets
import aiohttp
import pygame
from pygame.locals import *
import traceback
import warnings
def warn_with_traceback(message, category, filename, lineno, file=None, line=None):
log = file if hasattr(file,'write') else sys.stderr
traceback.print_stack(file=log)
log.write(warnings.formatwarning(message, category, filename, lineno, line))
warnings.showwarning = warn_with_traceback
BLOCS = BLOCW, BLOCH = 20, 20
SIZE = WIDTH, HEIGHT = 60 * BLOCW, 35 * BLOCH
FPS = 25
SN_L = '<'
SN_R = '>'
SN_U = '^'
SN_D = 'v'
BIGAPPLE_TIME = 8 * FPS
DEDAPPLE_FREQ = 6
PID = getpid()
MID = -1
GID = sha256((str(PID) + str(time.time())).encode('ascii')).hexdigest()
SERV = sys.argv[2] if len(sys.argv) > 2 else 'localhost'
def m(*a):
return json.dumps(a)
def n(s):
return json.loads(s)
class Snake(pygame.sprite.Sprite):
def __init__(self, sid=0):
super(type(self), self).__init__()
self.image = pygame.Surface(BLOCS)
self.image.fill((
random.randint(128, 255),
random.randint(128, 255),
random.randint(128, 255)
))
self.rect = self.image.get_rect()
self.rect.x, self.rect.y = 0, 0
self.direction = SN_R
self.len = 1
self.id = sid
def update(self):
global trail
trail.add(Trail(self.rect.x, self.rect.y, self.len))
if self.direction == SN_D:
self.rect.y += BLOCH
elif self.direction == SN_U:
self.rect.y -= BLOCH
elif self.direction == SN_L:
self.rect.x -= BLOCW
elif self.direction == SN_R:
self.rect.x += BLOCW
self.rect.x %= WIDTH
self.rect.y %= HEIGHT
class Trail(pygame.sprite.Sprite):
def __init__(self, x, y, len):
super(type(self), self).__init__()
self.len = len
self.image = pygame.Surface(BLOCS)
self.image.fill((255, 255, 255))
self.rect = self.image.get_rect()
self.rect.x, self.rect.y = x, y
def update(self):
self.len -= 1
if self.len <= 0:
self.kill()
class Apple(pygame.sprite.Sprite):
def __init__(self):
super(type(self), self).__init__()
self.image = pygame.Surface(BLOCS)
self.image.fill((255, 0, 0))
self.rect = self.image.get_rect()
self.rect.x = random.randint(0, WIDTH - 1) // BLOCW * BLOCW
self.rect.y = random.randint(0, HEIGHT - 1) // BLOCH * BLOCH
async def update(self, ws):
global snake
for snak in pygame.sprite.spritecollide(self, snake, False):
snak.len += 1
self.image.fill((255, 0, 0))
#self.rect.x = random.randint(0, WIDTH - 1) // BLOCW * BLOCW
#self.rect.y = random.randint(0, HEIGHT - 1) // BLOCH * BLOCH
if snak.id == MID:
await ws.send(m('a'))
break
class BigApple(pygame.sprite.Sprite):
def __init__(self):
super(type(self), self).__init__()
self.image = pygame.Surface((BLOCW * 2, BLOCH * 2))
self.image.fill((255, 255, 0))
self.rect = self.image.get_rect()
self.rect.x = random.randint(0, WIDTH - 1) // (BLOCW * 2) * (BLOCW * 2)
self.rect.y = random.randint(0, HEIGHT - 1) // (BLOCH * 2) * (BLOCH * 2)
async def update(self, ws):
global snake
collides = pygame.sprite.spritecollide(self, snake, False)
for snak in collides:
snak.len += 4
#self.rect.x = random.randint(0, WIDTH - 1) \
# // (BLOCW * 2) \
# * (BLOCW * 2)
#self.rect.y = random.randint(0, HEIGHT - 1) \
# // (BLOCH * 2) \
# * (BLOCH * 2)
self.kill()
if snak.id == MID:
await ws.send(m('b'))
break
pygame.init()
SCREEN = pygame.display.set_mode(SIZE)
snake = pygame.sprite.RenderPlain()
trail = pygame.sprite.RenderPlain()
apple = pygame.sprite.RenderPlain(Apple())
thebigapple = BigApple()
frames = 0
if sys.platform.startswith('win32'):
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.get_event_loop()
async def register_event(self, event: str, func: callable, args: dict = {}):
if not callable(func):
raise TypeError
await self.subscribe(event, args)
self._events[event.lower()] = func
Client.on_event = shush.on_event
Client.register_event = register_event
RPC = Client('511398745057787905', loop=loop, pipe=sys.argv[1] if len(sys.argv) > 1 else 0)
status = {
'pid': PID,
'large_image': 'python-logo'
}
def mktext(surf, text, pos, size=15, color=(255, 255, 255)):
font = pygame.font.SysFont('monospace', size)
label = font.render(text, 1, color)
surf.blit(label, pos)
async def intro():
status.clear()
status.update({
'pid': PID,
'large_image': 'python-logo',
'state': 'In Intro'
})
global RPC
try:
await RPC.start()
except (AttributeError, InvalidPipe) as exc:
RPC = None
print(type(exc).__name__, exc, sep=': ')
if RPC:
await RPC.set_activity(**status)
SCREEN.fill((0, 0, 0))
mktext(SCREEN, 'If you hit anything white except a snake head you die.', (0, 0))
mktext(SCREEN, 'At any point, Escape or close the window to quit.', (0, BLOCH))
mktext(SCREEN, 'Press 1 for solo snake or local multiplayer.' + (' Press 2 for online multiplayer.' if RPC is not None else ''), (0, BLOCH * 2))
pygame.display.flip()
while 1:
for e in pygame.event.get():
if e.type == QUIT or e.type == KEYDOWN and e.key == K_ESCAPE:
raise SystemExit(0)
if e.type == KEYDOWN:
if e.key == K_1:
MODE = 1
elif e.key == K_2 and RPC is not None:
MODE = 2
else:
continue
return MODE
await asyncio.sleep(1/FPS)
async def party():
#client start
session = aiohttp.ClientSession(loop=loop)
#connect - secret = server room
SCREEN.fill((0, 0, 0))
mktext(SCREEN, 'Connecting...', (0, 0))
pygame.display.flip()
path = GID
ws = await websockets.connect('ws://' + SERV + ':8080/party/{}'.format(path))
status['state'] = 'In Lobby'
status['party_id'] = GID
status['party_size'] = [1, 2]
status['join'] = ''.join(chr(ord(i) + 1) for i in GID)
await RPC.set_activity(**status)
#wait for events
fut = loop.create_future()
def handler(deeta):
#discord says: joined party
print('handler:', deeta)
secret = deeta['secret']
status['party_id'] = ''.join(chr(ord(i) - 1) for i in secret)
status['party_size'][0] += 1
status['party_size'][1] += 1
status['join'] = secret
nonlocal path
fut.set_result('joined other')
path = status['party_id']
async def wait_ws():
try:
await ws.recv()
except websockets.ConnectionClosed:
return
fut.set_result('other joined')
loop.create_task(wait_ws())
await RPC.register_event('ACTIVITY_JOIN', handler)
SCREEN.fill((0, 0, 0))
mktext(SCREEN, "Waiting for party member. Go onto Discord and join someone's party or invite people to yours!", (0, 0))
pygame.display.flip()
while not fut.done():
for event in pygame.event.get():
if event.type == pygame.QUIT:
try:
await ws.close()
RPC.close()
finally:
raise SystemExit
await asyncio.sleep(1/FPS)
print(status)
if fut.result() == 'other joined': #server says: member joined
print('other joined')
status['party_size'][0] += 1
status['party_size'][1] += 1
else: #discord says: joined party
await ws.close() #disconnect
#join party server with secret
ws = await websockets.connect(
'ws://' + SERV + ':8080/party/{}'.format(path)
)
async with session.get('http://' + SERV + ':8080/party_size/{}'.format(path)) as resp:
assert resp.status == 200
status['party_size'][0] = int(await resp.text())
status['party_size'][1] = status['party_size'][0] + 1
await RPC.set_activity(**status)
#wait for events
ready = False
startable = False
readied = 0
async def wait_start(fut):
nonlocal readied, startable
async for msg in ws:
if msg.startswith('start: '):
fut.set_result(msg[len('start: '):])
return
elif msg in {'startable', 'unstartable'}:
startable = msg == 'startable'
elif msg.startswith('ready: '):
readied = int(msg[len('ready: '):])
else:
status['party_size'][0] = int(msg)
status['party_size'][1] = status['party_size'][0] + 1
await RPC.set_activity(**status)
print(msg, ready, startable, readied, sep=', ')
fut = loop.create_future()
loop.create_task(wait_start(fut))
while not fut.done():
for event in pygame.event.get():
if event.type == QUIT:
try:
await ws.close()
RPC.close()
finally:
raise SystemExit
if event.type == KEYDOWN:
if event.key == K_SPACE:
if not ready:
ready = True
await ws.send('ready')
elif startable:
await ws.send('start')
elif event.key == K_ESCAPE:
if ready:
ready = False
await ws.send('unready')
SCREEN.fill((0, 0, 0))
if ready:
if startable:
text = 'Press Space to start the game!'
elif status['party_size'][0] == readied:
text = 'Waiting for party leader to start the game...'
else:
text = 'Waiting for {} players to ready up... \
Press Escape to unready if needed'.format(status['party_size'][0] - readied)
else:
text = 'Press Space to ready up!'
mktext(SCREEN, text, (0, 0))
pygame.display.flip()
await asyncio.sleep(1/FPS)
await ws.close()
await session.close()
status['party_size'] = [1, 1]
del status['join']
#join game room
return fut.result()
def getpbyid(gp, pid):
for p in gp:
if p.id == pid:
return p
async def game(ws):
global frames, SCREEN, snake, trail, apple, thebigapple, meh, MID
status['start'] = int(time.time())
while 1:
scorestr = ''
scores = {}
for snak in snake:
scores[snak.id] = snak.len
scores = list(scores.items())
scores.sort(key=lambda s: s[0])
for sn, sc in scores:
scorestr += 'Snake {}: {}; '.format(sn, sc)
scorestr = scorestr.strip()
pygame.display.set_caption(scorestr)
for e in pygame.event.get():
if e.type == QUIT or e.type == KEYDOWN and e.key == K_ESCAPE:
try:
RPC.close()
await ws.close()
finally:
raise SystemExit
if e.type == KEYDOWN:
with suppress(websockets.ConnectionClosed):
if e.key == K_UP and meh.direction != SN_D:
asyncio.create_task(ws.send(m(
'd', SN_U, meh.rect.x, meh.rect.y
)))
elif e.key == K_DOWN and meh.direction != SN_U:
asyncio.create_task(ws.send(m(
'd', SN_D, meh.rect.x, meh.rect.y
)))
elif e.key == K_LEFT and meh.direction != SN_R:
asyncio.create_task(ws.send(m(
'd', SN_L, meh.rect.x, meh.rect.y
)))
elif e.key == K_RIGHT and meh.direction != SN_L:
asyncio.create_task(ws.send(m(
'd', SN_R, meh.rect.x, meh.rect.y
)))
if frames % FPS == 0:
status['details'] = 'Competitive' if len(snake.sprites()) > 1 else 'Solo'
status['state'] = scorestr or 'No players'
if RPC:
await RPC.set_activity(**status)
SCREEN.fill((0, 0, 0))
snake.update()
trail.update()
for spr in apple:
with suppress(websockets.ConnectionClosed):
await spr.update(ws)
if MID != -1 and pygame.sprite.spritecollideany(meh, trail):
await ws.close()
return
snake.draw(SCREEN)
trail.draw(SCREEN)
apple.draw(SCREEN)
pygame.display.flip()
await asyncio.sleep(1/FPS)
frames += 1
async def sock(ws):
global meh, MID
try:
async for msg in ws:
print(msg)
msg = n(msg)
cmd, pid, *args = msg
if cmd == '+':
snak = Snake(pid)
if MID == -1:
meh = snak
MID = pid
else:
status['party_size'][0] += 1
status['party_size'][1] += 1
snake.add(snak)
elif cmd == '-':
snak = getpbyid(snake, pid)
if snak:
snak.kill()
status['party_size'][0] -= 1
status['party_size'][1] -= 1
if pid == MID:
return
elif cmd == 'd':
snak = getpbyid(snake, pid)
if snak:
snak.direction = args[0]
if pid != MID:
snak.rect.x, snak.rect.y = args[1:]
elif cmd == 'a':
for spr in apple:
if isinstance(spr, Apple): #?
leapp = spr
break
leapp.rect.x, leapp.rect.y = args[1:]
snak = getpbyid(snake, pid)
if snak:
snak.len = args[0]
elif cmd == 'b':
thebigapple.kill()
snak = getpbyid(snake, pid)
if snak:
snak.len = args[0]
elif cmd == 'B':
thebigapple.rect.x, thebigapple.rect.y = args
apple.add(thebigapple)
except websockets.ConnectionClosed:
return
async def main():
try:
while 1:
MODE = await intro()
if MODE == 1:
return True
elif MODE == 2 and RPC is not None:
path = await party()
print('path:', path)
status['party_id'] = path
try:
async with websockets.connect('ws://' + SERV + ':8080/game/{}'.format(path)) as s:
done, pending = await asyncio.wait(
(game(s), sock(s)), return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
except ConnectionRefusedError:
continue
cont = False
SCREEN.fill((0, 0, 0))
mktext(SCREEN, 'Game End!', (0, 0))
mktext(SCREEN,
'Press Escape or close the window to quit, or'
' press Space to play again.',
(0, BLOCH))
pygame.display.flip()
while not cont:
for event in pygame.event.get():
if event.type == QUIT or event.type == KEYDOWN and event.key == K_ESCAPE:
return
if event.type == KEYDOWN and event.key == K_SPACE:
cont = True
break
await asyncio.sleep(1/FPS)
finally:
if RPC:
RPC.close()
try:
if loop.run_until_complete(main()):
import snake
finally:
loop.stop()
pygame.quit()