summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Glüpker <git@wgmd.de>2022-10-08 20:40:02 +0200
committerAndré Glüpker <git@wgmd.de>2022-10-08 20:42:27 +0200
commit75d015d3389c7a7f105c77a68291f293b5cdd4a0 (patch)
treea90f1ca1055fce340a88220bc7a76c73c3dec6d8
parent12b10b204c59ef16ca7dd793ee7724f93e8136d3 (diff)
downloadsteam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.tar.gz
steam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.tar.bz2
steam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.zip
Format with black
-rw-r--r--Caching.py16
-rwxr-xr-xDatabase.py19
-rwxr-xr-xQueryServer.py184
-rw-r--r--SteamAPI.py206
-rwxr-xr-xmain.py202
-rw-r--r--wsgi.py3
6 files changed, 379 insertions, 251 deletions
diff --git a/Caching.py b/Caching.py
index 7eee817..e187754 100644
--- a/Caching.py
+++ b/Caching.py
@@ -2,14 +2,15 @@
import os
import time
+
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
-CACHE_DIR = os.path.join(THIS_DIR, 'cache')
+CACHE_DIR = os.path.join(THIS_DIR, "cache")
-class Caching():
+class Caching:
def readCache(profile, steamid, timetolife):
filepath = os.path.join(CACHE_DIR, profile)
- filename = str(steamid)+'.tmp'
+ filename = str(steamid) + ".tmp"
complete = os.path.join(filepath, filename)
try:
fileinfo = os.stat(complete)
@@ -18,24 +19,25 @@ class Caching():
if fileinfo.st_mtime < (time.time() - timetolife):
return False
- with open(complete, 'rt') as cachefile:
+ with open(complete, "rt") as cachefile:
content = cachefile.read()
return content
def writeCache(profile, steamid, content):
filepath = os.path.join(CACHE_DIR, profile)
- filename = str(steamid)+'.tmp'
+ filename = str(steamid) + ".tmp"
complete = os.path.join(filepath, filename)
try:
os.makedirs(filepath, exist_ok=True)
- with open(complete, 'wt') as cachefile:
+ with open(complete, "wt") as cachefile:
cachefile.write(content)
except:
print('Could not create cache file/dir "%s".' % complete)
return False
return True
+
if __name__ == "__main__":
# TODO(andre): Maybe run tests here?
- print('This is a module.')
+ print("This is a module.")
diff --git a/Database.py b/Database.py
index 001f2b2..c2e4e4b 100755
--- a/Database.py
+++ b/Database.py
@@ -3,32 +3,35 @@
# import os
# import time
import sqlite3
+
# database = sqlite3.connect('data.db')
-class Database():
+
+class Database:
def __init__(self):
- self._connection = sqlite3.connect('data.db')
+ self._connection = sqlite3.connect("data.db")
self._cursor = self._connection.cursor()
- self._cursor.
+ # self._cursor.
# print(self._database)
print(self._cursor)
def initDB(self):
- print('If db not exists...generate schema here')
+ print("If db not exists...generate schema here")
def login(self, username, password):
- print('try to login')
- return 'userobject' or false
+ print("try to login")
+ return "userobject" or False
# Use flask session management?
# What crypt/hash to use?
def getMyTracked(self, userid):
- return 'list of tracked users'
+ return "list of tracked users"
def msg(self, msg):
print(msg)
+
if __name__ == "__main__":
database = Database()
database.initDB()
- database.msg('2')
+ database.msg("2")
diff --git a/QueryServer.py b/QueryServer.py
index fb2efd1..9227e5a 100755
--- a/QueryServer.py
+++ b/QueryServer.py
@@ -6,35 +6,33 @@ import time
CACHE_TTL = 15
sourcequery = [
- 50, # HL Opposing Force
- 240, # CS:S
- 320, # HL2 DM
- 440, # TF2
- 550, # L4D2
- 630, # AlienSwarm
- 730, # CS:GO
- 4000, # Garrys Mod
- 107410, # ARMA III
- # 221100, # DayZ
- 252490, # Rust
- 282440, # Quake Live
- 328070, # Reflex
- 346110 # ARK: Survival Evolved
- ]
-sourcequeryinc = [
- 107410, # ARMA III
- 346110 # ARK: Survival Evolved
- ]
+ 50, # HL Opposing Force
+ 240, # CS:S
+ 320, # HL2 DM
+ 440, # TF2
+ 550, # L4D2
+ 630, # AlienSwarm
+ 730, # CS:GO
+ 4000, # Garrys Mod
+ 107410, # ARMA III
+ # 221100, # DayZ
+ 252490, # Rust
+ 282440, # Quake Live
+ 328070, # Reflex
+ 346110, # ARK: Survival Evolved
+]
+sourcequeryinc = [107410, 346110] # ARMA III # ARK: Survival Evolved
import json
import socket
import struct
from SteamAPI import SteamAPI
+
if CACHE_TTL > 0:
from Caching import Caching
-class QueryServer():
+class QueryServer:
def QueryServer(ip, port, gameid):
"""Query a supported server.
@@ -45,23 +43,27 @@ class QueryServer():
Returns:
dict() with parsed server values"""
- port = int(port)
+ port = int(port)
gameid = int(gameid)
if CACHE_TTL > 0:
- cache = Caching.readCache('server', '%s-%s-%s' % (str(gameid), ip, str(port)), CACHE_TTL)
+ cache = Caching.readCache(
+ "server", "%s-%s-%s" % (str(gameid), ip, str(port)), CACHE_TTL
+ )
if cache:
return json.loads(cache)
socket.setdefaulttimeout(5)
if int(gameid) in sourcequery:
- print('Querying server...')
+ print("Querying server...")
serverdata = QueryServer._querySourceServer(ip, port, gameid)
if CACHE_TTL > 0:
serialized = json.dumps(serverdata)
- Caching.writeCache('server', '%s-%s-%s' % (str(gameid), ip, str(port)), serialized)
+ Caching.writeCache(
+ "server", "%s-%s-%s" % (str(gameid), ip, str(port)), serialized
+ )
return serverdata
@@ -78,15 +80,15 @@ class QueryServer():
message = None
try:
- conn.connect( (ip, port) )
- conn.sendall( b'\xFF\xFF\xFF\xFF\x54' + b'Source Engine Query' + b'\x00' )
+ conn.connect((ip, port))
+ conn.sendall(b"\xFF\xFF\xFF\xFF\x54" + b"Source Engine Query" + b"\x00")
message = conn.recv(4096)
- conn.sendall( b'\xFF\xFF\xFF\xFF\x55\xFF\xFF\xFF\xFF' )
+ conn.sendall(b"\xFF\xFF\xFF\xFF\x55\xFF\xFF\xFF\xFF")
challenge = conn.recv(1024)[5:]
- conn.sendall( b'\xFF\xFF\xFF\xFF\x55' + challenge )
+ conn.sendall(b"\xFF\xFF\xFF\xFF\x55" + challenge)
playerdata = conn.recv(8192)
except Exception as e:
- print('Exception in SourceQuery connection:', e)
+ print("Exception in SourceQuery connection:", e)
if not message:
return None
finally:
@@ -95,103 +97,115 @@ class QueryServer():
data = {}
latency = time.time() - starttime
- data['latency'] = int(latency * 1000)
+ data["latency"] = int(latency * 1000)
# [0]-[3] -> 4*\xFF
- data['header'] = chr(message[4])
- data['protocol'] = chr(message[5])
+ data["header"] = chr(message[4])
+ data["protocol"] = chr(message[5])
message = message[6:]
# Servername
- nullterm = message.index(b'\0')
- data['name'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["name"] = message[:nullterm]
+ message = message[nullterm + 1 :]
# Mapname
- nullterm = message.index(b'\0')
- data['map'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["map"] = message[:nullterm]
+ message = message[nullterm + 1 :]
# Folder
- nullterm = message.index(b'\0')
- data['folder'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["folder"] = message[:nullterm]
+ message = message[nullterm + 1 :]
# Game
- nullterm = message.index(b'\0')
- data['game'] = message[:nullterm]
- message = message[nullterm+1:]
-
- data['gameid'] = (message[1] << 8) + message[0]
- data['players'] = message[2]
- data['playersmax'] = message[3]
- data['bots'] = message[4]
- data['servertype'] = chr(message[5]) # dedicated/local/proxy(tv)
- data['enviroment'] = chr(message[6]) # windows/linux/mac
- data['visibility'] = message[7] # 0public 1private
- data['vac'] = message[8]
+ nullterm = message.index(b"\0")
+ data["game"] = message[:nullterm]
+ message = message[nullterm + 1 :]
+
+ data["gameid"] = (message[1] << 8) + message[0]
+ data["players"] = message[2]
+ data["playersmax"] = message[3]
+ data["bots"] = message[4]
+ data["servertype"] = chr(message[5]) # dedicated/local/proxy(tv)
+ data["enviroment"] = chr(message[6]) # windows/linux/mac
+ data["visibility"] = message[7] # 0public 1private
+ data["vac"] = message[8]
message = message[9:]
# Game
- nullterm = message.index(b'\0')
- data['gameversion'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["gameversion"] = message[:nullterm]
+ message = message[nullterm + 1 :]
extradataflag = message[0]
message = message[1:]
if extradataflag & 0x80:
- data['port'] = (message[1] << 8) + message[0]
+ data["port"] = (message[1] << 8) + message[0]
message = message[2:]
if extradataflag & 0x10:
- data['steamid'] = (message[7] << 56) + (message[6] << 48) + \
- (message[5] << 40) + (message[4] << 32) + \
- (message[3] << 24) + (message[2] << 16) + \
- (message[1] << 8) + (message[0])
+ data["steamid"] = (
+ (message[7] << 56)
+ + (message[6] << 48)
+ + (message[5] << 40)
+ + (message[4] << 32)
+ + (message[3] << 24)
+ + (message[2] << 16)
+ + (message[1] << 8)
+ + (message[0])
+ )
message = message[8:]
if extradataflag & 0x40:
- data['sourcetvport'] = (message[1] << 8) + message[0]
+ data["sourcetvport"] = (message[1] << 8) + message[0]
message = message[2:]
- nullterm = message.index(b'\0')
- data['sourcetvname'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["sourcetvname"] = message[:nullterm]
+ message = message[nullterm + 1 :]
if extradataflag & 0x20:
- nullterm = message.index(b'\0')
- data['keywords'] = message[:nullterm]
- message = message[nullterm+1:]
+ nullterm = message.index(b"\0")
+ data["keywords"] = message[:nullterm]
+ message = message[nullterm + 1 :]
if extradataflag & 0x01:
- data['sgameid'] = (message[7] << 56) + (message[6] << 48) + \
- (message[5] << 40) + (message[4] << 32) + \
- (message[3] << 24) + (message[2] << 16) + \
- (message[1] << 8) + (message[0])
+ data["sgameid"] = (
+ (message[7] << 56)
+ + (message[6] << 48)
+ + (message[5] << 40)
+ + (message[4] << 32)
+ + (message[3] << 24)
+ + (message[2] << 16)
+ + (message[1] << 8)
+ + (message[0])
+ )
message = message[8:]
- if playerdata and playerdata[4] == ord('D'):
+ if playerdata and playerdata[4] == ord("D"):
players = []
- data['player_info'] = players
+ data["player_info"] = players
number = playerdata[5]
playerdata = playerdata[6:]
index = 0
while index < number and playerdata:
player = dict()
- player['index'] = playerdata[0]
+ player["index"] = playerdata[0]
- nullterm = playerdata.index(b'\0', 1)
+ nullterm = playerdata.index(b"\0", 1)
player_name = playerdata[1:nullterm]
- player['name'] = player_name.decode('utf-8', errors='replace')
- playerdata = playerdata[nullterm+1:]
+ player["name"] = player_name.decode("utf-8", errors="replace")
+ playerdata = playerdata[nullterm + 1 :]
- playerscore = int(struct.unpack('i', playerdata[0:4])[0])
- player['score'] = playerscore
+ playerscore = int(struct.unpack("i", playerdata[0:4])[0])
+ player["score"] = playerscore
playerdata = playerdata[4:]
- duration = int(struct.unpack('f', playerdata[0:4])[0])
- player['duration'] = duration
+ duration = int(struct.unpack("f", playerdata[0:4])[0])
+ player["duration"] = duration
playerdata = playerdata[4:]
players.append(player)
@@ -201,14 +215,14 @@ class QueryServer():
for key, value in data.items():
if isinstance(value, bytes):
try:
- value = value.decode('utf-8', errors='replace')
+ value = value.decode("utf-8", errors="replace")
data[key] = value
except UnicodeDecodeError:
data[key] = "UTF8 Error"
pass
return data
+
if __name__ == "__main__":
# TODO(andre): Maybe run our tests here?
- print('This is a module.')
-
+ print("This is a module.")
diff --git a/SteamAPI.py b/SteamAPI.py
index 63b79c6..95a7105 100644
--- a/SteamAPI.py
+++ b/SteamAPI.py
@@ -16,7 +16,8 @@ CACHE = True
executor = ThreadPoolExecutor(max_workers=25)
-class SteamAPI():
+
+class SteamAPI:
def __init__(self, token):
self.token = token
@@ -27,16 +28,16 @@ class SteamAPI():
dict() with str(appid) as keys, str gamename as value
"""
if CACHE:
- cache = Caching.readCache('general', 'gamelist', 7*24*60*60)
+ cache = Caching.readCache("general", "gamelist", 7 * 24 * 60 * 60)
if cache:
return json.loads(cache)
- url = 'https://api.steampowered.com/ISteamApps/GetAppList/v2'
+ url = "https://api.steampowered.com/ISteamApps/GetAppList/v2"
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- applist = jsondata['applist']['apps']
+ applist = jsondata["applist"]["apps"]
except KeyError:
return None
except HTTPError:
@@ -45,10 +46,10 @@ class SteamAPI():
gamelist = dict()
for app in applist:
# str keys for json conversion
- gamelist[str(app['appid'])] = app['name']
+ gamelist[str(app["appid"])] = app["name"]
if CACHE:
- cache = Caching.writeCache('general', 'gamelist', json.dumps(gamelist))
+ cache = Caching.writeCache("general", "gamelist", json.dumps(gamelist))
return gamelist
def getProfiles(self, steamids):
@@ -65,7 +66,7 @@ class SteamAPI():
if CACHE:
cachedids = []
for steamid in steamids_copy:
- cache = Caching.readCache('profile', steamid, 15)
+ cache = Caching.readCache("profile", steamid, 15)
if cache:
cachedids.append(steamid)
profile[steamid] = json.loads(cache)
@@ -74,20 +75,23 @@ class SteamAPI():
responses = []
while steamids_copy:
- steamidlist = ','.join([str(x) for x in steamids_copy[:50]])
+ steamidlist = ",".join([str(x) for x in steamids_copy[:50]])
steamids_copy = steamids_copy[50:]
- url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s' % (self.token, steamidlist)
+ url = (
+ "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s"
+ % (self.token, steamidlist)
+ )
responses.append(executor.submit(urlopen, url))
for responseF in responses:
response = responseF.result()
if response.status != 200:
continue
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- for player in jsondata['response']['players']:
- currentid = player['steamid']
+ for player in jsondata["response"]["players"]:
+ currentid = player["steamid"]
self.sanitizePlayer(player)
profile[currentid] = player
@@ -96,7 +100,7 @@ class SteamAPI():
for steamid in steamids_copy:
if steamid not in profile:
continue
- Caching.writeCache('profile', steamid, json.dumps(profile[steamid]))
+ Caching.writeCache("profile", steamid, json.dumps(profile[steamid]))
return profile
@@ -108,7 +112,7 @@ class SteamAPI():
profiles = executor.submit(self.getProfiles, steamids).result()
for steamid in steamids:
- profiles[steamid]['friends'] = results[steamid].result()
+ profiles[steamid]["friends"] = results[steamid].result()
return profiles
@@ -121,24 +125,33 @@ class SteamAPI():
List of steamids. TODO(andre): We lose additional information here.
"""
if CACHE:
- cache = Caching.readCache('friends', steamid, 15*60)
+ cache = Caching.readCache("friends", steamid, 15 * 60)
if cache:
return json.loads(cache)
- url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid))
+ url = (
+ "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError:
# f.e. profile is private
jsondata = None
friends = []
- if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']:
- friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']]
+ if (
+ jsondata
+ and "friendslist" in jsondata
+ and "friends" in jsondata["friendslist"]
+ ):
+ friends = [
+ friend["steamid"] for friend in jsondata["friendslist"]["friends"]
+ ]
if CACHE:
- Caching.writeCache('friends', steamid, json.dumps(friends))
+ Caching.writeCache("friends", steamid, json.dumps(friends))
return friends
@@ -151,24 +164,27 @@ class SteamAPI():
gameName, gameVersion, availableGameStats (Achievements/Stats)
"""
if CACHE:
- cache = Caching.readCache('gameschema', gameid, 7*24*60*60)
+ cache = Caching.readCache("gameschema", gameid, 7 * 24 * 60 * 60)
if cache:
jsondata = json.loads(cache)
- return jsondata['game']
+ return jsondata["game"]
- url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid))
+ url = (
+ "http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json"
+ % (self.token, str(gameid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError as e:
# f.e. profile is private
jsondata = None
- if 'game' in jsondata:
+ if "game" in jsondata:
if CACHE:
- Caching.writeCache('gameschema', gameid, json.dumps(jsondata))
- return jsondata['game']
+ Caching.writeCache("gameschema", gameid, json.dumps(jsondata))
+ return jsondata["game"]
return None
def getPlayerGames(self, steamid):
@@ -180,24 +196,30 @@ class SteamAPI():
Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons])
"""
if CACHE:
- cache = Caching.readCache('games', steamid, 60*60)
+ cache = Caching.readCache("games", steamid, 60 * 60)
if cache:
jsondata = json.loads(cache)
- return ( jsondata['response']['game_count'], jsondata['response']['games'] )
-
- url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid))
+ return (
+ jsondata["response"]["game_count"],
+ jsondata["response"]["games"],
+ )
+
+ url = (
+ "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError:
# f.e. profile is private
jsondata = None
- if 'response' in jsondata and 'games' in jsondata['response']:
+ if "response" in jsondata and "games" in jsondata["response"]:
if CACHE:
- Caching.writeCache('games', steamid, json.dumps(jsondata))
- return ( jsondata['response']['game_count'], jsondata['response']['games'] )
+ Caching.writeCache("games", steamid, json.dumps(jsondata))
+ return (jsondata["response"]["game_count"], jsondata["response"]["games"])
return None
def getUserstatsForGame(self, steamid, gameid):
@@ -210,34 +232,45 @@ class SteamAPI():
dict() with statname and value
"""
if CACHE:
- cache = Caching.readCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60)
+ cache = Caching.readCache(
+ "usergamestats", "%s-%s" % (str(steamid), str(gameid)), 24 * 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid))
+ url = (
+ "http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s"
+ % (self.token, str(steamid), str(gameid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- statslist = jsondata['playerstats']['stats']
+ statslist = jsondata["playerstats"]["stats"]
userstats = dict()
for stat in statslist:
- userstats[stat] = statslist[stat]['value']
+ userstats[stat] = statslist[stat]["value"]
except HTTPError:
# f.e. profile is private
userstats = None
if userstats:
if CACHE:
- cache = Caching.writeCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(userstats))
+ cache = Caching.writeCache(
+ "usergamestats",
+ "%s-%s" % (str(steamid), str(gameid)),
+ json.dumps(userstats),
+ )
return userstats
return None
def getMultipleUserUserstatsForGame(self, steamids, gameid):
futures = dict()
for steamid in steamids:
- futures[steamid] = executor.submit(self.getUserstatsForGame, steamid, gameid)
+ futures[steamid] = executor.submit(
+ self.getUserstatsForGame, steamid, gameid
+ )
result = dict()
for steamid in steamids:
@@ -254,24 +287,31 @@ class SteamAPI():
dict() with game_count and games, which contains appid + playtime_forever
"""
if CACHE:
- cache = Caching.readCache('userownedgames', '%s' % (str(steamid)), 7*24*60*60)
+ cache = Caching.readCache(
+ "userownedgames", "%s" % (str(steamid)), 7 * 24 * 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s' % (self.token, str(steamid))
+ url = (
+ "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- if 'response' in jsondata:
- jsondata = jsondata['response']
+ if "response" in jsondata:
+ jsondata = jsondata["response"]
except HTTPError:
jsondata = None
if jsondata:
if CACHE:
- cache = Caching.writeCache('userownedgames', '%s' % (str(steamid)), json.dumps(jsondata))
+ cache = Caching.writeCache(
+ "userownedgames", "%s" % (str(steamid)), json.dumps(jsondata)
+ )
return jsondata
return None
@@ -291,24 +331,32 @@ class SteamAPI():
"""
if CACHE:
- cache = Caching.readCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), 60*60)
+ cache = Caching.readCache(
+ "serverupdate", "%s-%s" % (str(gameid), str(gameversion)), 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}'.format(gameid, gameversion)
+ url = "http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}".format(
+ gameid, gameversion
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- if 'response' in jsondata:
- jsondata = jsondata['response']
+ if "response" in jsondata:
+ jsondata = jsondata["response"]
except HTTPError:
jsondata = None
if jsondata:
if CACHE:
- cache = Caching.writeCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), json.dumps(jsondata))
+ cache = Caching.writeCache(
+ "serverupdate",
+ "%s-%s" % (str(gameid), str(gameversion)),
+ json.dumps(jsondata),
+ )
return jsondata
return None
@@ -326,33 +374,43 @@ class SteamAPI():
def getDataForPremadefinder(self, steamids):
futures = dict()
- futures['profiles'] = executor.submit(self.getProfiles, steamids)
- futures['ownedGames'] = dict()
- futures['userstats'] = dict()
- futures['friends'] = dict()
+ futures["profiles"] = executor.submit(self.getProfiles, steamids)
+ futures["ownedGames"] = dict()
+ futures["userstats"] = dict()
+ futures["friends"] = dict()
for steamid in steamids:
- futures['ownedGames'][steamid] = executor.submit(self.getOwnedGames, steamid)
- futures['userstats'][steamid] = executor.submit(self.getUserstatsForGame, steamid, 730)
- futures['friends'][steamid] = executor.submit(self.getFriends, steamid)
-
- profiles = futures['profiles'].result()
+ futures["ownedGames"][steamid] = executor.submit(
+ self.getOwnedGames, steamid
+ )
+ futures["userstats"][steamid] = executor.submit(
+ self.getUserstatsForGame, steamid, 730
+ )
+ futures["friends"][steamid] = executor.submit(self.getFriends, steamid)
+
+ profiles = futures["profiles"].result()
for steamid in profiles:
- profiles[steamid]['_friends'] = futures['friends'][steamid].result()
- profiles[steamid]['_userstats'] = futures['userstats'][steamid].result()
- profiles[steamid]['_ownedGames'] = futures['ownedGames'][steamid].result()
- profiles[steamid]['_ownedPlayedGames'] = 'n/a'
- if profiles[steamid]['_ownedGames']:
- profiles[steamid]['_ownedPlayedGames'] = len([game for game in profiles[steamid]['_ownedGames']['games'] if game['playtime_forever'] > 0])
+ profiles[steamid]["_friends"] = futures["friends"][steamid].result()
+ profiles[steamid]["_userstats"] = futures["userstats"][steamid].result()
+ profiles[steamid]["_ownedGames"] = futures["ownedGames"][steamid].result()
+ profiles[steamid]["_ownedPlayedGames"] = "n/a"
+ if profiles[steamid]["_ownedGames"]:
+ profiles[steamid]["_ownedPlayedGames"] = len(
+ [
+ game
+ for game in profiles[steamid]["_ownedGames"]["games"]
+ if game["playtime_forever"] > 0
+ ]
+ )
return profiles
@staticmethod
def sanitizePlayer(player):
- if 'lastlogoff' not in player:
- player['lastlogoff'] = -1
+ if "lastlogoff" not in player:
+ player["lastlogoff"] = -1
+
if __name__ == "__main__":
# TODO(andre): Maybe run tests here?
- print('This is a module.')
-
+ print("This is a module.")
diff --git a/main.py b/main.py
index e39eb5f..5fd96a1 100755
--- a/main.py
+++ b/main.py
@@ -11,6 +11,7 @@ import os
import re
import sys
import time, datetime
+
# import traceback
import logging
@@ -19,13 +20,13 @@ from QueryServer import QueryServer
# Set some directories
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
-TEMPLATES = os.path.join(THIS_DIR, 'templates')
-ASSETS = os.path.join(THIS_DIR, 'assets')
-CONFIG = os.path.join(THIS_DIR, 'config')
+TEMPLATES = os.path.join(THIS_DIR, "templates")
+ASSETS = os.path.join(THIS_DIR, "assets")
+CONFIG = os.path.join(THIS_DIR, "config")
app = Flask(__name__)
steam = None
-with open(os.path.join(CONFIG, 'secrets.json'), 'rt') as secretjson:
+with open(os.path.join(CONFIG, "secrets.json"), "rt") as secretjson:
secrets = json.load(secretjson)
app.secret_key = secrets["flask_secret"]
steam = SteamAPI(secrets["steam_token"])
@@ -35,16 +36,18 @@ with open(os.path.join(CONFIG, 'secrets.json'), 'rt') as secretjson:
##################################################
intervals = (
- ('w', 604800), # 60 * 60 * 24 * 7
- ('d', 86400), # 60 * 60 * 24
- ('h', 3600), # 60 * 60
- ('m', 60),
- ('s', 1),
- )
+ ("w", 604800), # 60 * 60 * 24 * 7
+ ("d", 86400), # 60 * 60 * 24
+ ("h", 3600), # 60 * 60
+ ("m", 60),
+ ("s", 1),
+)
+
def display_time(timestamp, format="%d.%m.%Y"):
return datetime.datetime.fromtimestamp(timestamp).strftime(format)
+
def display_age(seconds, granularity=2):
result = []
for name, count in intervals:
@@ -52,50 +55,58 @@ def display_age(seconds, granularity=2):
if value:
seconds -= value * count
if value == 1:
- name = name.rstrip('s')
+ name = name.rstrip("s")
result.append("{}{}".format(int(value), name))
- return ' '.join(result[:granularity])
+ return " ".join(result[:granularity])
+
+
+app.jinja_env.filters["display_time"] = display_time
+app.jinja_env.filters["display_age"] = display_age
-app.jinja_env.filters['display_time'] = display_time
-app.jinja_env.filters['display_age'] = display_age
def grepSteamids(text):
steamids = []
- SteamIDfromText = re.findall(r'STEAM_\d:(\d):(\d+)', text)
+ SteamIDfromText = re.findall(r"STEAM_\d:(\d):(\d+)", text)
for steamid in SteamIDfromText:
steam64id = 76561197960265728 + int(steamid[0]) + (int(steamid[1]) * 2)
steamids.append(steam64id)
return steamids
+
##################################################
## Functions for different pages
##################################################
+
@app.route("/")
def main():
- return redirect(url_for('lobby'))
+ return redirect(url_for("lobby"))
+
@app.errorhandler(404)
def not_found(e):
- return render_template('error.jinja', error = 'Die angeforderte Seite konnte nicht gefunden werden.')
+ return render_template(
+ "error.jinja", error="Die angeforderte Seite konnte nicht gefunden werden."
+ )
+
@app.route("/lobby")
def lobby():
steamids = dict()
- friends = request.args.get('friends')
+ friends = request.args.get("friends")
if friends:
- friendList = friends.split(',')
+ friendList = friends.split(",")
profiles = steam.getMultipleFriends(friendList)
for steamid, profile in profiles.items():
- for friendid in profile['friends']:
+ for friendid in profile["friends"]:
if friendid in steamids:
- steamids[friendid]['main'] += ', ' + profile['personaname']
+ steamids[friendid]["main"] += ", " + profile["personaname"]
else:
- steamids[friendid] = {'main': profile['personaname']}
+ steamids[friendid] = {"main": profile["personaname"]}
else:
# Load config (steamids, names)
- with open(os.path.join(CONFIG, 'lobby.json'), 'rt') as config:
+ with open(os.path.join(CONFIG, "lobby.json"), "rt") as config:
steamids = json.load(config)
gamelist = steam.getGames()
@@ -110,16 +121,15 @@ def lobby():
serverinfo = dict()
for steamid, playerdata in steamids.items():
- if 'gameid' not in playerdata \
- or 'gameserverip' not in playerdata:
- continue
- if ':' not in playerdata['gameserverip']:
+ if "gameid" not in playerdata or "gameserverip" not in playerdata:
continue
- gameserver = playerdata['gameserverip']
+ if ":" not in playerdata["gameserverip"]:
+ continue
+ gameserver = playerdata["gameserverip"]
if gameserver not in serverinfo:
- ip, port = gameserver.split(':')
+ ip, port = gameserver.split(":")
port = int(port)
- gameid = playerdata['gameid']
+ gameid = playerdata["gameid"]
# print('Query Server:', ip, port, gameid)
server = QueryServer.QueryServer(ip, port, gameid)
# print('Response:', server)
@@ -127,33 +137,63 @@ def lobby():
serverinfo[gameserver] = server
# Sort steamids to be more appealing
- steamids = OrderedDict(sorted(steamids.items(), key = lambda player: player[1]['personaname'].lower()))
- steamids = OrderedDict(sorted(steamids.items(), reverse=True, key = lambda player: int(player[1]['lastlogoff'])))
- steamids = OrderedDict(sorted(steamids.items(), key = lambda player: (player[1]['personastate'] > 0) and player[1]['personastate'] or 10))
- steamids = OrderedDict(sorted(steamids.items(), key = lambda player: ('gameid' in player[1] and player[1]['gameid'] or "zzz")))
-
- return render_template('lobby_html.jinja',
- steamids = steamids,
- serverinfo = serverinfo,
- gamelist = gamelist,
- states = ['Offline', 'Online', 'Busy', 'Away', 'Snooze', 'Looking to trade', 'Looking to play'],
- current_time = time.time())
-
-@app.route("/premadefinder", methods=['GET', 'POST'])
+ steamids = OrderedDict(
+ sorted(steamids.items(), key=lambda player: player[1]["personaname"].lower())
+ )
+ steamids = OrderedDict(
+ sorted(
+ steamids.items(),
+ reverse=True,
+ key=lambda player: int(player[1]["lastlogoff"]),
+ )
+ )
+ steamids = OrderedDict(
+ sorted(
+ steamids.items(),
+ key=lambda player: (player[1]["personastate"] > 0)
+ and player[1]["personastate"]
+ or 10,
+ )
+ )
+ steamids = OrderedDict(
+ sorted(
+ steamids.items(),
+ key=lambda player: ("gameid" in player[1] and player[1]["gameid"] or "zzz"),
+ )
+ )
+
+ return render_template(
+ "lobby_html.jinja",
+ steamids=steamids,
+ serverinfo=serverinfo,
+ gamelist=gamelist,
+ states=[
+ "Offline",
+ "Online",
+ "Busy",
+ "Away",
+ "Snooze",
+ "Looking to trade",
+ "Looking to play",
+ ],
+ current_time=time.time(),
+ )
+
+
+@app.route("/premadefinder", methods=["GET", "POST"])
def premades():
- steamids = []
+ steamids = []
premadedata = dict()
connections = set()
- if request.method == 'POST':
- postdata = request.form['statustext']
+ if request.method == "POST":
+ postdata = request.form["statustext"]
steamids = grepSteamids(postdata)
steamids = [str(x) for x in steamids]
if len(steamids) > 50:
return render_template(
- 'error.jinja',
- error='Es sind maximal 50 Steamids erlaubt.'
+ "error.jinja", error="Es sind maximal 50 Steamids erlaubt."
)
# Ask steam about profiles
@@ -162,60 +202,70 @@ def premades():
# Add connection between friends.
# Friends are always bidirectional, so we use set and min/max to avoid duplicates
for steamid in steamids:
- for friend in premadedata[steamid]['_friends']:
+ for friend in premadedata[steamid]["_friends"]:
if friend in steamids:
friend_a = min(steamid, friend)
friend_b = max(steamid, friend)
connections.add((friend_a, friend_b))
- return render_template('premades_html.jinja',
- steamids=steamids,
- profiles=premadedata,
- current_time = time.time(),
- connections=connections
- )
+ return render_template(
+ "premades_html.jinja",
+ steamids=steamids,
+ profiles=premadedata,
+ current_time=time.time(),
+ connections=connections,
+ )
+
-@app.route("/server", methods=['GET'])
+@app.route("/server", methods=["GET"])
def server():
- with open(os.path.join(CONFIG, 'server.json'), 'rt') as config:
+ with open(os.path.join(CONFIG, "server.json"), "rt") as config:
servers = json.load(config)
executor = ThreadPoolExecutor(max_workers=10)
serverdata_ = dict()
for _, serverdata in servers.items():
- name = "{0}{1:d}{2:d}".format(serverdata['ip'], serverdata['port'], serverdata['gameid'])
+ name = "{0}{1:d}{2:d}".format(
+ serverdata["ip"], serverdata["port"], serverdata["gameid"]
+ )
serverdata_[name] = executor.submit(
QueryServer.QueryServer,
- serverdata['ip'],
- serverdata['port'],
- serverdata['gameid']
+ serverdata["ip"],
+ serverdata["port"],
+ serverdata["gameid"],
)
for _, serverdata in servers.items():
- name = "{0}{1:d}{2:d}".format(serverdata['ip'], serverdata['port'], serverdata['gameid'])
- serverdata['data'] = serverdata_[name].result()
- if serverdata['data']:
- serverdata_['u' + name] = executor.submit(
+ name = "{0}{1:d}{2:d}".format(
+ serverdata["ip"], serverdata["port"], serverdata["gameid"]
+ )
+ serverdata["data"] = serverdata_[name].result()
+ if serverdata["data"]:
+ serverdata_["u" + name] = executor.submit(
steam.getGameUpdateState,
- serverdata['data']['gameid'],
- serverdata['data']['gameversion'],
+ serverdata["data"]["gameid"],
+ serverdata["data"]["gameversion"],
)
for _, serverdata in servers.items():
- name = "{0}{1:d}{2:d}".format(serverdata['ip'], serverdata['port'], serverdata['gameid'])
- if 'u' + name in serverdata_:
- serverdata['update'] = serverdata_['u' + name].result()
+ name = "{0}{1:d}{2:d}".format(
+ serverdata["ip"], serverdata["port"], serverdata["gameid"]
+ )
+ if "u" + name in serverdata_:
+ serverdata["update"] = serverdata_["u" + name].result()
+
+ return render_template(
+ "server_html.jinja",
+ servers=servers,
+ )
- return render_template('server_html.jinja',
- servers=servers,
- )
-if __name__ == '__main__':
+if __name__ == "__main__":
# print(steam.getFriends("76561197963063991"))
# print(steam.getFriends("76561197963882989"))
- logging.basicConfig(filename='./main.log', level=logging.INFO)
+ logging.basicConfig(filename="./main.log", level=logging.INFO)
- app.config['TEMPLATES_AUTO_RELOAD'] = True
+ app.config["TEMPLATES_AUTO_RELOAD"] = True
app.run(threaded=True)
# Changelog
diff --git a/wsgi.py b/wsgi.py
index 58e4576..7cc7fdc 100644
--- a/wsgi.py
+++ b/wsgi.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
import sys
-sys.path.append('./')
+
+sys.path.append("./")
from main import app as application