#!/usr/bin/env python3 # Time to live for caching. 0 to disable caching CACHE_TTL = 15 sourcequery = [ 50, # HL Opposing Force 240, # CS:S 320, # HL2 DM 440, # TF2 550, # L4D2 630, # AlienSwarm 730, # CS:GO 4000, # Garrys Mod 107410, # ARMA III # 221100, # DayZ 252490, # Rust 282440, # Quake Live 328070, # Reflex 346110 # ARK: Survival Evolved ] sourcequeryinc = [ 107410, # ARMA III 346110 # ARK: Survival Evolved ] import json import socket import struct from SteamAPI import SteamAPI if CACHE_TTL > 0: from Caching import Caching class QueryServer(): def QueryServer(ip, port, gameid): """Query a supported server. Args: ip: IP of the gameserver port: Port of the gameserver gameid: Steam gameid of running game Returns: dict() with parsed server values""" port = int(port) gameid = int(gameid) if CACHE_TTL > 0: cache = Caching.readCache('server', '%s-%s-%s' % (str(gameid), ip, str(port)), CACHE_TTL) if cache: print('From cache') return json.loads(cache) socket.setdefaulttimeout(5) if int(gameid) in sourcequery: print('Querying server...') serverdata = QueryServer._querySourceServer(ip, port, gameid) if CACHE_TTL > 0: serialized = json.dumps(serverdata) Caching.writeCache('server', '%s-%s-%s' % (str(gameid), ip, str(port)), serialized) return serverdata return None def _querySourceServer(ip, port, gameid): """Query servers using the source query protocol. Some servers listen for these queries on an higher port.""" if gameid in sourcequeryinc: port += 1 # print(ip, port, gameid) conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) message = None try: conn.connect( (ip, port) ) conn.sendall( b'\xFF\xFF\xFF\xFF\x54' + b'Source Engine Query' + b'\x00' ) message = conn.recv(4096) conn.sendall( b'\xFF\xFF\xFF\xFF\x55\xFF\xFF\xFF\xFF' ) challenge = conn.recv(1024)[5:] conn.sendall( b'\xFF\xFF\xFF\xFF\x55' + challenge ) playerdata = conn.recv(4096) except Exception as e: print('Exception in SourceQuery connection:', e) if not message: return None finally: conn.close() data = dict() # [0]-[3] -> 4*\xFF data['header'] = chr(message[4]) data['protocol'] = chr(message[5]) message = message[6:] # Servername nullterm = message.index(b'\0') data['name'] = message[:nullterm] message = message[nullterm+1:] # Mapname nullterm = message.index(b'\0') data['map'] = message[:nullterm] message = message[nullterm+1:] # Folder nullterm = message.index(b'\0') data['folder'] = message[:nullterm] message = message[nullterm+1:] # Game nullterm = message.index(b'\0') data['game'] = message[:nullterm] message = message[nullterm+1:] data['gameid'] = (message[1] << 8) + message[0] data['players'] = message[2] data['playersmax'] = message[3] data['bots'] = message[4] data['servertype'] = chr(message[5]) # dedicated/local/proxy(tv) data['enviroment'] = chr(message[6]) # windows/linux/mac data['visibility'] = message[7] # 0public 1private data['vac'] = message[8] message = message[9:] # Game nullterm = message.index(b'\0') data['gameversion'] = message[:nullterm] message = message[nullterm+1:] extradataflag = message[0] message = message[1:] if extradataflag & 0x80: data['port'] = (message[1] << 8) + message[0] message = message[2:] if extradataflag & 0x10: data['steamid'] = (message[7] << 56) + (message[6] << 48) + \ (message[5] << 40) + (message[4] << 32) + \ (message[3] << 24) + (message[2] << 16) + \ (message[1] << 8) + (message[0]) message = message[8:] if extradataflag & 0x40: data['sourcetvport'] = (message[1] << 8) + message[0] message = message[2:] nullterm = message.index(b'\0') data['sourcetvname'] = message[:nullterm] message = message[nullterm+1:] if extradataflag & 0x20: nullterm = message.index(b'\0') data['keywords'] = message[:nullterm] message = message[nullterm+1:] if extradataflag & 0x01: data['sgameid'] = (message[7] << 56) + (message[6] << 48) + \ (message[5] << 40) + (message[4] << 32) + \ (message[3] << 24) + (message[2] << 16) + \ (message[1] << 8) + (message[0]) message = message[8:] if playerdata and playerdata[4] == ord('D'): players = [] data['player_info'] = players number = playerdata[5] playerdata = playerdata[6:] index = 0 while index < number and playerdata: player = dict() player['index'] = playerdata[0] nullterm = playerdata.index(b'\0', 1) player_name = playerdata[1:nullterm] player['name'] = player_name.decode('utf-8', errors='replace') playerdata = playerdata[nullterm+1:] playerscore = (playerdata[3] << 24) + (playerdata[2] << 16) + \ (playerdata[1] << 8) + (playerdata[0]) player['score'] = playerscore playerdata = playerdata[4:] duration = int(struct.unpack('f', playerdata[0:4])[0]) player['duration'] = duration playerdata = playerdata[4:] players.append(player) index += 1 # Everything is type bytes, so convert things to utf8? for key, value in data.items(): if isinstance(value, bytes): try: value = value.decode('utf-8', errors='replace') data[key] = value except UnicodeDecodeError: data[key] = "UTF8 Error" pass return data if __name__ == "__main__": # TODO(andre): Maybe run our tests here? print('This is a module.')