#!/usr/bin/env python3 import time # Time to live for caching. 0 to disable caching CACHE_TTL = 15 sourcequery = [ 50, # HL Opposing Force 240, # CS:S 320, # HL2 DM 440, # TF2 550, # L4D2 630, # AlienSwarm 730, # CS:GO 4000, # Garrys Mod 107410, # ARMA III # 221100, # DayZ 252490, # Rust 282440, # Quake Live 328070, # Reflex 346110, # ARK: Survival Evolved ] sourcequeryinc = [107410, 346110] # ARMA III # ARK: Survival Evolved import json import socket import struct from SteamAPI import SteamAPI if CACHE_TTL > 0: from Caching import Caching class QueryServer: def QueryServer(ip, port, gameid): """Query a supported server. Args: ip: IP of the gameserver port: Port of the gameserver gameid: Steam gameid of running game Returns: dict() with parsed server values""" port = int(port) gameid = int(gameid) if CACHE_TTL > 0: cache = Caching.readCache( "server", "%s-%s-%s" % (str(gameid), ip, str(port)), CACHE_TTL ) if cache: return json.loads(cache) socket.setdefaulttimeout(5) if int(gameid) in sourcequery: print("Querying server...") try: serverdata = QueryServer._querySourceServer(ip, port, gameid) if CACHE_TTL > 0: serialized = json.dumps(serverdata) Caching.writeCache( "server", "%s-%s-%s" % (str(gameid), ip, str(port)), serialized ) except Exception: return None return serverdata return None def _querySourceServer(ip, port, gameid): """Query servers using the source query protocol. Some servers listen for these queries on an higher port.""" if gameid in sourcequeryinc: port += 1 starttime = time.time() conn = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) message = None try: # \x54 A2S_INFO # \x55 A2S_PLAYER # \x56 A2S_RULES # \x56 A2S_RULES conn.connect((ip, port)) # Fetch A2S_INFO conn.sendall(b"\xFF\xFF\xFF\xFF\x54" + b"Source Engine Query" + b"\x00") message = conn.recv(4096) if message[4] == 0x41: # If server responds with challenge, return it back conn.sendall(b"\xFF\xFF\xFF\xFF\x54" + b"Source Engine Query" + b"\x00" + message[5:]) message = conn.recv(4096) # Request Player Info conn.sendall(b"\xFF\xFF\xFF\xFF\x55\xFF\xFF\xFF\xFF") challenge = conn.recv(1024) # Answer challenge / retrieve player info conn.sendall(b"\xFF\xFF\xFF\xFF\x55" + challenge[5:]) playerdata = conn.recv(8192) except Exception as e: print("Exception in SourceQuery connection:", e) if not message: return None finally: conn.close() data = {} latency = time.time() - starttime data["latency"] = int(latency * 1000) # [0]-[3] -> 4*\xFF == Package not split # Otherwise we need to stitch them together assert message[0:4] == b'\xFF\xFF\xFF\xFF' data["header"] = chr(message[4]) data["protocol"] = chr(message[5]) message = message[6:] # Servername nullterm = message.index(b"\0") data["name"] = message[:nullterm] message = message[nullterm + 1 :] # Mapname nullterm = message.index(b"\0") data["map"] = message[:nullterm] message = message[nullterm + 1 :] # Folder nullterm = message.index(b"\0") data["folder"] = message[:nullterm] message = message[nullterm + 1 :] # Game nullterm = message.index(b"\0") data["game"] = message[:nullterm] message = message[nullterm + 1 :] data["gameid"] = (message[1] << 8) + message[0] data["players"] = message[2] data["playersmax"] = message[3] data["bots"] = message[4] data["servertype"] = chr(message[5]) # dedicated/local/proxy(tv) data["enviroment"] = chr(message[6]) # windows/linux/mac data["visibility"] = message[7] # 0public 1private data["vac"] = message[8] message = message[9:] # Game nullterm = message.index(b"\0") data["gameversion"] = message[:nullterm] message = message[nullterm + 1 :] extradataflag = message[0] message = message[1:] if extradataflag & 0x80: data["port"] = (message[1] << 8) + message[0] message = message[2:] if extradataflag & 0x10: data["steamid"] = ( (message[7] << 56) + (message[6] << 48) + (message[5] << 40) + (message[4] << 32) + (message[3] << 24) + (message[2] << 16) + (message[1] << 8) + (message[0]) ) message = message[8:] if extradataflag & 0x40: data["sourcetvport"] = (message[1] << 8) + message[0] message = message[2:] nullterm = message.index(b"\0") data["sourcetvname"] = message[:nullterm] message = message[nullterm + 1 :] if extradataflag & 0x20: nullterm = message.index(b"\0") data["keywords"] = message[:nullterm] message = message[nullterm + 1 :] if extradataflag & 0x01: data["sgameid"] = ( (message[7] << 56) + (message[6] << 48) + (message[5] << 40) + (message[4] << 32) + (message[3] << 24) + (message[2] << 16) + (message[1] << 8) + (message[0]) ) message = message[8:] if playerdata and playerdata[4] == ord("D"): players = [] data["player_info"] = players number = playerdata[5] playerdata = playerdata[6:] index = 0 while index < number and playerdata: player = dict() player["index"] = playerdata[0] nullterm = playerdata.index(b"\0", 1) player_name = playerdata[1:nullterm] player["name"] = player_name.decode("utf-8", errors="replace") playerdata = playerdata[nullterm + 1 :] playerscore = int(struct.unpack("i", playerdata[0:4])[0]) player["score"] = playerscore playerdata = playerdata[4:] duration = int(struct.unpack("f", playerdata[0:4])[0]) player["duration"] = duration playerdata = playerdata[4:] players.append(player) index += 1 # Everything is type bytes, so convert things to utf8? for key, value in data.items(): if isinstance(value, bytes): try: value = value.decode("utf-8", errors="replace") data[key] = value except UnicodeDecodeError: data[key] = "UTF8 Error" pass return data if __name__ == "__main__": # TODO(andre): Maybe run our tests here? print("This is a module.")