telnet.py
import socket
import struct
import random
import threading
import time
import sys
servsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
servsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = 8000
if len(sys.argv) > 1:
port = int(sys.argv[1])
servsock.bind(('0.0.0.0', port))
servsock.listen(5)
IAC = b"\377"
WONT = b"\375"
WILL = b"\373"
LINEMODE = b"\042"
ECHO = b"\001"
SGA = b"\003"
CHARACTER_MODE = IAC + WILL + ECHO + IAC + WILL + SGA + IAC + WONT + LINEMODE
CLEAR_SCREEN = b"\033[2J"
CURSOR_ORIGIN = b"\033[1;1H"
HIDE_CURSOR = b"\033[?25l"
SHOW_CURSOR = b"\033[?25h"
UP = b'\x1b[A'
DOWN = b'\x1b[B'
RIGHT = b'\x1b[C'
LEFT = b'\x1b[D'
CTRL_C = b'\x03'
SAVE_CURSOR = b"\033[s"
RESTORE_CURSOR = b"\033[u"
MOVE_CURSOR = lambda x,y : "\033[{};{}H".format(y,x).encode('ascii')
GET_CURSOR_POS = b"\033[6n"
RED = b"\033[31;1m"
GREEN = b"\033[32;1m"
YELLOW = b"\033[33;1m"
BLUE = b"\033[34;1m"
MAGENTA = b"\033[35;1m"
CYAN = b"\033[36;1m"
WHITE = b"\033[37;1m"
RESET = b"\033[0m"
BOLD = b"\033[1m"
def color_letter(letter, color):
out = b''
if color == b'W':
out += WHITE
elif color == b'R':
out += RED
elif color == b'G':
out += GREEN
elif color == b'Y':
out += YELLOW
elif color == b'B':
out += BLUE
elif color == b'M':
out += MAGENTA
elif color == b'C':
out += CYAN
out += letter
out += RESET
return out
MIN_BOUND = -100
MAX_BOUND = 100
class World(object):
def __init__(self):
self.letters = {
(random.randint(MIN_BOUND, MAX_BOUND),
random.randint(MIN_BOUND, MAX_BOUND)) : (b'.', b'W')
for _ in range(300)}
self.characters = [] # NOT THREAD SAFE
def register_character(self, c):
self.characters.append(c)
def kill_character(self, c):
i = self.characters.index(c)
del self.characters[i]
def drop_letter(self, letter, color, coords):
self.letters[(coords[0], coords[1])] = (letter, color)
def render(self, screen_size, top_left):
render_map = {}
clear_map = {}
for y_ in range(screen_size[1]):
for x_ in range(screen_size[0]):
x = x_ - top_left[0]
y = y_ - top_left[1]
if (x,y) in self.letters:
render_map[(x_, y_)] = color_letter(*self.letters[(x,y)])
clear_map[(x_, y_)] = b' '
for c in self.characters:
if x == c.coords[0] and y == c.coords[1]:
render_map[(x_, y_)] = BOLD + c.get_char_code() + RESET
clear_map[(x_, y_)] = b' '
return render_map, clear_map
def strip_datum(data):
if data[:2] == b'\x1b[' and len(data) > 2:
index = 0
for b in data:
index += 1
if bytes(b).decode('ascii').isalpha():
break
if index > 0:
return data[:index], data[index:]
return data[:1], data[1:]
def parse(data):
data_out = []
while data:
datum, data = strip_datum(data)
data_out.append(datum)
return data_out
class Character(object):
def __init__(self, world):
self.world = world
self.world.register_character(self)
self.coords = [0,0] # x, y
self.screen_size = [0, 0]
self.clear_codes = {}
self.char = b'*'
self.color = b' '
self.alive = True
def get_char_code(self):
return color_letter(self.char, self.color)
def screen_center(self):
return (self.screen_size[0] // 2, self.screen_size[1] // 2)
def top_left_coords(self):
return (self.screen_center()[0] - self.coords[0],
self.screen_center()[1] - self.coords[1])
def die(self):
self.alive = False
self.world.kill_character(self)
def render(self):
w_s, c_s = self.world.render(self.screen_size, self.top_left_coords())
render_map = {**self.clear_codes, **w_s}
self.clear_codes = c_s
add_str_to_map(render_map, (1,1), 'arrows to move')
add_str_to_map(render_map, (1,2), 'keys to change letter')
add_str_to_map(render_map, (1,3), 'space to stamp')
return render_map_to_bytes(render_map)
def normalize_coords(self):
self.coords[0] = min(max(MIN_BOUND, self.coords[0]), MAX_BOUND)
self.coords[1] = min(max(MIN_BOUND, self.coords[1]), MAX_BOUND)
def update(self, data):
for datum in parse(data):
self.update_single(datum)
def update_single(self, data):
if data == UP:
self.coords[1] -= 1
elif data == DOWN:
self.coords[1] += 1
elif data == RIGHT:
self.coords[0] += 1
elif data == LEFT:
self.coords[0] -= 1
elif len(data) == 1 and ord(data) >= 97 and ord(data) <= 122:
self.char = data
elif data == b'*':
self.char = b'*'
elif len(data) == 1 and ord(data) >= 65 and ord(data) <= 90:
self.color = data
elif data == b' ':
letter = b' ' if self.char == b'*' else self.char
self.world.drop_letter(letter, self.color, self.coords)
else:
return False
self.normalize_coords()
return True
def update_screen_size(self, data):
try:
screen_size = [int(i) for i in data[2:-1].decode('ascii').split(';')][::-1]
assert len(screen_size) == 2
assert data[-1:] == b'R'
self.screen_size = screen_size
return True
except:
return False
w = World()
class TSList(object):
def __init__(self):
self.items = []
self.lock = threading.RLock()
def append(self, client):
self.lock.acquire()
self.items.append(client)
self.lock.release()
def remove(self, client):
self.lock.acquire()
if client in self.items:
index = self.items.index(client)
del self.items[index]
self.lock.release()
def __len__(self):
self.lock.acquire()
l = len(self.items)
self.lock.release()
return l
def foreach(self, callback):
for item in self.items:
callback(item)
g_clients = TSList()
# TODO remove contiguous movement
def render_map_to_bytes(render_map):
out = b''
for k, v in render_map.items():
out += MOVE_CURSOR(*k) + v
return out
def add_str_to_map(m, coord, s):
for i in range(len(s)):
m[(coord[0] + i, coord[1])] = s[i].encode('ascii')
class SafeSocket(object):
def __init__(self, sock):
self.alive = True
self.sock = sock
# one minute of inactivity kills the socket
self.sock.settimeout(60.0)
def send(self, payload):
try:
self.sock.sendall(payload)
except:
self.close()
def recv(self):
try:
r = self.sock.recv(256)
assert r != b''
return r
except:
self.close()
return ''
def close(self):
self.alive = False
try:
self.sock.shutdown(2)
self.sock.close()
except:
pass
class Client(object):
def __init__(self, sock, addr):
self.sock = sock
self.addr = addr
self.sock.send(CLEAR_SCREEN)
self.sock.send(CURSOR_ORIGIN)
self.sock.send(HIDE_CURSOR)
self.c = Character(w)
self.sock.send(MOVE_CURSOR(5000,5000))
self.sock.send(GET_CURSOR_POS)
self.lock = threading.RLock()
self.alive = True
def callback_(self, data):
if data == CTRL_C:
self.die()
return
if self.c.update_screen_size(data):
self.sock.send(CLEAR_SCREEN)
self.sock.send(self.c.render())
return
if self.c.screen_size == [0,0]:
self.sock.send(MOVE_CURSOR(5000,5000))
self.sock.send(GET_CURSOR_POS)
if self.c.update(data):
broadcast_update()
self.sock.send(self.c.render())
def callback(self, data):
now = time.time()
self.lock.acquire()
self.callback_(data)
self.lock.release()
def die(self):
self.alive = False
if self.c.alive:
self.c.die()
if self.sock.alive:
self.sock.send(CLEAR_SCREEN)
self.sock.send(SHOW_CURSOR)
self.sock.close()
print('{} disconnected.'.format(self.addr))
g_clients.remove(self)
def broadcast_update():
g_clients.foreach(lambda client: client.callback(''))
def init_telnet(client):
client.sock.send(CHARACTER_MODE)
while True:
c = client.sock.sock.recv(1)
if c == b'\xf0': # SE
c = client.sock.sock.recv(3) # IAC DONT ECHO
break
def socket_listener(client):
init_telnet(client)
while client.alive and client.sock.alive:
data = client.sock.recv()
try:
client.callback(data)
except Exception as e:
client.die()
client.die()
broadcast_update()
print(len(g_clients), 'clients connected.')
while True:
(clientsocket, address) = servsock.accept()
clientsocket = SafeSocket(clientsocket)
print('connection attempt: {}...'.format(address[0]))
reject = False
if len(g_clients) > 50:
reject = True
for c in g_clients.items:
if c.addr == address[0]:
reject = True
if reject:
print('refused.')
clientsocket.send('Too many connections! Try telnet jott.live {}. '.format(port + 1).encode('ascii'))
clientsocket.close()
continue
else:
print('accepted.')
client = Client(clientsocket, address[0])
t = threading.Thread(target=socket_listener, args=(client,))
g_clients.append(client)
print(len(g_clients), 'clients connected.')
t.start()