summaryrefslogtreecommitdiff
path: root/SteamAPI.py
diff options
context:
space:
mode:
authorAndré Glüpker <git@wgmd.de>2022-10-08 20:40:02 +0200
committerAndré Glüpker <git@wgmd.de>2022-10-08 20:42:27 +0200
commit75d015d3389c7a7f105c77a68291f293b5cdd4a0 (patch)
treea90f1ca1055fce340a88220bc7a76c73c3dec6d8 /SteamAPI.py
parent12b10b204c59ef16ca7dd793ee7724f93e8136d3 (diff)
downloadsteam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.tar.gz
steam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.tar.bz2
steam-75d015d3389c7a7f105c77a68291f293b5cdd4a0.zip
Format with black
Diffstat (limited to 'SteamAPI.py')
-rw-r--r--SteamAPI.py206
1 files changed, 132 insertions, 74 deletions
diff --git a/SteamAPI.py b/SteamAPI.py
index 63b79c6..95a7105 100644
--- a/SteamAPI.py
+++ b/SteamAPI.py
@@ -16,7 +16,8 @@ CACHE = True
executor = ThreadPoolExecutor(max_workers=25)
-class SteamAPI():
+
+class SteamAPI:
def __init__(self, token):
self.token = token
@@ -27,16 +28,16 @@ class SteamAPI():
dict() with str(appid) as keys, str gamename as value
"""
if CACHE:
- cache = Caching.readCache('general', 'gamelist', 7*24*60*60)
+ cache = Caching.readCache("general", "gamelist", 7 * 24 * 60 * 60)
if cache:
return json.loads(cache)
- url = 'https://api.steampowered.com/ISteamApps/GetAppList/v2'
+ url = "https://api.steampowered.com/ISteamApps/GetAppList/v2"
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- applist = jsondata['applist']['apps']
+ applist = jsondata["applist"]["apps"]
except KeyError:
return None
except HTTPError:
@@ -45,10 +46,10 @@ class SteamAPI():
gamelist = dict()
for app in applist:
# str keys for json conversion
- gamelist[str(app['appid'])] = app['name']
+ gamelist[str(app["appid"])] = app["name"]
if CACHE:
- cache = Caching.writeCache('general', 'gamelist', json.dumps(gamelist))
+ cache = Caching.writeCache("general", "gamelist", json.dumps(gamelist))
return gamelist
def getProfiles(self, steamids):
@@ -65,7 +66,7 @@ class SteamAPI():
if CACHE:
cachedids = []
for steamid in steamids_copy:
- cache = Caching.readCache('profile', steamid, 15)
+ cache = Caching.readCache("profile", steamid, 15)
if cache:
cachedids.append(steamid)
profile[steamid] = json.loads(cache)
@@ -74,20 +75,23 @@ class SteamAPI():
responses = []
while steamids_copy:
- steamidlist = ','.join([str(x) for x in steamids_copy[:50]])
+ steamidlist = ",".join([str(x) for x in steamids_copy[:50]])
steamids_copy = steamids_copy[50:]
- url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s' % (self.token, steamidlist)
+ url = (
+ "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s"
+ % (self.token, steamidlist)
+ )
responses.append(executor.submit(urlopen, url))
for responseF in responses:
response = responseF.result()
if response.status != 200:
continue
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- for player in jsondata['response']['players']:
- currentid = player['steamid']
+ for player in jsondata["response"]["players"]:
+ currentid = player["steamid"]
self.sanitizePlayer(player)
profile[currentid] = player
@@ -96,7 +100,7 @@ class SteamAPI():
for steamid in steamids_copy:
if steamid not in profile:
continue
- Caching.writeCache('profile', steamid, json.dumps(profile[steamid]))
+ Caching.writeCache("profile", steamid, json.dumps(profile[steamid]))
return profile
@@ -108,7 +112,7 @@ class SteamAPI():
profiles = executor.submit(self.getProfiles, steamids).result()
for steamid in steamids:
- profiles[steamid]['friends'] = results[steamid].result()
+ profiles[steamid]["friends"] = results[steamid].result()
return profiles
@@ -121,24 +125,33 @@ class SteamAPI():
List of steamids. TODO(andre): We lose additional information here.
"""
if CACHE:
- cache = Caching.readCache('friends', steamid, 15*60)
+ cache = Caching.readCache("friends", steamid, 15 * 60)
if cache:
return json.loads(cache)
- url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid))
+ url = (
+ "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError:
# f.e. profile is private
jsondata = None
friends = []
- if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']:
- friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']]
+ if (
+ jsondata
+ and "friendslist" in jsondata
+ and "friends" in jsondata["friendslist"]
+ ):
+ friends = [
+ friend["steamid"] for friend in jsondata["friendslist"]["friends"]
+ ]
if CACHE:
- Caching.writeCache('friends', steamid, json.dumps(friends))
+ Caching.writeCache("friends", steamid, json.dumps(friends))
return friends
@@ -151,24 +164,27 @@ class SteamAPI():
gameName, gameVersion, availableGameStats (Achievements/Stats)
"""
if CACHE:
- cache = Caching.readCache('gameschema', gameid, 7*24*60*60)
+ cache = Caching.readCache("gameschema", gameid, 7 * 24 * 60 * 60)
if cache:
jsondata = json.loads(cache)
- return jsondata['game']
+ return jsondata["game"]
- url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid))
+ url = (
+ "http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json"
+ % (self.token, str(gameid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError as e:
# f.e. profile is private
jsondata = None
- if 'game' in jsondata:
+ if "game" in jsondata:
if CACHE:
- Caching.writeCache('gameschema', gameid, json.dumps(jsondata))
- return jsondata['game']
+ Caching.writeCache("gameschema", gameid, json.dumps(jsondata))
+ return jsondata["game"]
return None
def getPlayerGames(self, steamid):
@@ -180,24 +196,30 @@ class SteamAPI():
Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons])
"""
if CACHE:
- cache = Caching.readCache('games', steamid, 60*60)
+ cache = Caching.readCache("games", steamid, 60 * 60)
if cache:
jsondata = json.loads(cache)
- return ( jsondata['response']['game_count'], jsondata['response']['games'] )
-
- url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid))
+ return (
+ jsondata["response"]["game_count"],
+ jsondata["response"]["games"],
+ )
+
+ url = (
+ "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
except HTTPError:
# f.e. profile is private
jsondata = None
- if 'response' in jsondata and 'games' in jsondata['response']:
+ if "response" in jsondata and "games" in jsondata["response"]:
if CACHE:
- Caching.writeCache('games', steamid, json.dumps(jsondata))
- return ( jsondata['response']['game_count'], jsondata['response']['games'] )
+ Caching.writeCache("games", steamid, json.dumps(jsondata))
+ return (jsondata["response"]["game_count"], jsondata["response"]["games"])
return None
def getUserstatsForGame(self, steamid, gameid):
@@ -210,34 +232,45 @@ class SteamAPI():
dict() with statname and value
"""
if CACHE:
- cache = Caching.readCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60)
+ cache = Caching.readCache(
+ "usergamestats", "%s-%s" % (str(steamid), str(gameid)), 24 * 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid))
+ url = (
+ "http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s"
+ % (self.token, str(steamid), str(gameid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- statslist = jsondata['playerstats']['stats']
+ statslist = jsondata["playerstats"]["stats"]
userstats = dict()
for stat in statslist:
- userstats[stat] = statslist[stat]['value']
+ userstats[stat] = statslist[stat]["value"]
except HTTPError:
# f.e. profile is private
userstats = None
if userstats:
if CACHE:
- cache = Caching.writeCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(userstats))
+ cache = Caching.writeCache(
+ "usergamestats",
+ "%s-%s" % (str(steamid), str(gameid)),
+ json.dumps(userstats),
+ )
return userstats
return None
def getMultipleUserUserstatsForGame(self, steamids, gameid):
futures = dict()
for steamid in steamids:
- futures[steamid] = executor.submit(self.getUserstatsForGame, steamid, gameid)
+ futures[steamid] = executor.submit(
+ self.getUserstatsForGame, steamid, gameid
+ )
result = dict()
for steamid in steamids:
@@ -254,24 +287,31 @@ class SteamAPI():
dict() with game_count and games, which contains appid + playtime_forever
"""
if CACHE:
- cache = Caching.readCache('userownedgames', '%s' % (str(steamid)), 7*24*60*60)
+ cache = Caching.readCache(
+ "userownedgames", "%s" % (str(steamid)), 7 * 24 * 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s' % (self.token, str(steamid))
+ url = (
+ "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s"
+ % (self.token, str(steamid))
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- if 'response' in jsondata:
- jsondata = jsondata['response']
+ if "response" in jsondata:
+ jsondata = jsondata["response"]
except HTTPError:
jsondata = None
if jsondata:
if CACHE:
- cache = Caching.writeCache('userownedgames', '%s' % (str(steamid)), json.dumps(jsondata))
+ cache = Caching.writeCache(
+ "userownedgames", "%s" % (str(steamid)), json.dumps(jsondata)
+ )
return jsondata
return None
@@ -291,24 +331,32 @@ class SteamAPI():
"""
if CACHE:
- cache = Caching.readCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), 60*60)
+ cache = Caching.readCache(
+ "serverupdate", "%s-%s" % (str(gameid), str(gameversion)), 60 * 60
+ )
if cache:
cachedata = json.loads(cache)
return cachedata
- url = 'http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}'.format(gameid, gameversion)
+ url = "http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}".format(
+ gameid, gameversion
+ )
try:
response = urlopen(url)
- data = response.read().decode('utf-8')
+ data = response.read().decode("utf-8")
jsondata = json.loads(data)
- if 'response' in jsondata:
- jsondata = jsondata['response']
+ if "response" in jsondata:
+ jsondata = jsondata["response"]
except HTTPError:
jsondata = None
if jsondata:
if CACHE:
- cache = Caching.writeCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), json.dumps(jsondata))
+ cache = Caching.writeCache(
+ "serverupdate",
+ "%s-%s" % (str(gameid), str(gameversion)),
+ json.dumps(jsondata),
+ )
return jsondata
return None
@@ -326,33 +374,43 @@ class SteamAPI():
def getDataForPremadefinder(self, steamids):
futures = dict()
- futures['profiles'] = executor.submit(self.getProfiles, steamids)
- futures['ownedGames'] = dict()
- futures['userstats'] = dict()
- futures['friends'] = dict()
+ futures["profiles"] = executor.submit(self.getProfiles, steamids)
+ futures["ownedGames"] = dict()
+ futures["userstats"] = dict()
+ futures["friends"] = dict()
for steamid in steamids:
- futures['ownedGames'][steamid] = executor.submit(self.getOwnedGames, steamid)
- futures['userstats'][steamid] = executor.submit(self.getUserstatsForGame, steamid, 730)
- futures['friends'][steamid] = executor.submit(self.getFriends, steamid)
-
- profiles = futures['profiles'].result()
+ futures["ownedGames"][steamid] = executor.submit(
+ self.getOwnedGames, steamid
+ )
+ futures["userstats"][steamid] = executor.submit(
+ self.getUserstatsForGame, steamid, 730
+ )
+ futures["friends"][steamid] = executor.submit(self.getFriends, steamid)
+
+ profiles = futures["profiles"].result()
for steamid in profiles:
- profiles[steamid]['_friends'] = futures['friends'][steamid].result()
- profiles[steamid]['_userstats'] = futures['userstats'][steamid].result()
- profiles[steamid]['_ownedGames'] = futures['ownedGames'][steamid].result()
- profiles[steamid]['_ownedPlayedGames'] = 'n/a'
- if profiles[steamid]['_ownedGames']:
- profiles[steamid]['_ownedPlayedGames'] = len([game for game in profiles[steamid]['_ownedGames']['games'] if game['playtime_forever'] > 0])
+ profiles[steamid]["_friends"] = futures["friends"][steamid].result()
+ profiles[steamid]["_userstats"] = futures["userstats"][steamid].result()
+ profiles[steamid]["_ownedGames"] = futures["ownedGames"][steamid].result()
+ profiles[steamid]["_ownedPlayedGames"] = "n/a"
+ if profiles[steamid]["_ownedGames"]:
+ profiles[steamid]["_ownedPlayedGames"] = len(
+ [
+ game
+ for game in profiles[steamid]["_ownedGames"]["games"]
+ if game["playtime_forever"] > 0
+ ]
+ )
return profiles
@staticmethod
def sanitizePlayer(player):
- if 'lastlogoff' not in player:
- player['lastlogoff'] = -1
+ if "lastlogoff" not in player:
+ player["lastlogoff"] = -1
+
if __name__ == "__main__":
# TODO(andre): Maybe run tests here?
- print('This is a module.')
-
+ print("This is a module.")