From 75d015d3389c7a7f105c77a68291f293b5cdd4a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Gl=C3=BCpker?= Date: Sat, 8 Oct 2022 20:40:02 +0200 Subject: Format with black --- SteamAPI.py | 206 ++++++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 132 insertions(+), 74 deletions(-) (limited to 'SteamAPI.py') diff --git a/SteamAPI.py b/SteamAPI.py index 63b79c6..95a7105 100644 --- a/SteamAPI.py +++ b/SteamAPI.py @@ -16,7 +16,8 @@ CACHE = True executor = ThreadPoolExecutor(max_workers=25) -class SteamAPI(): + +class SteamAPI: def __init__(self, token): self.token = token @@ -27,16 +28,16 @@ class SteamAPI(): dict() with str(appid) as keys, str gamename as value """ if CACHE: - cache = Caching.readCache('general', 'gamelist', 7*24*60*60) + cache = Caching.readCache("general", "gamelist", 7 * 24 * 60 * 60) if cache: return json.loads(cache) - url = 'https://api.steampowered.com/ISteamApps/GetAppList/v2' + url = "https://api.steampowered.com/ISteamApps/GetAppList/v2" try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) - applist = jsondata['applist']['apps'] + applist = jsondata["applist"]["apps"] except KeyError: return None except HTTPError: @@ -45,10 +46,10 @@ class SteamAPI(): gamelist = dict() for app in applist: # str keys for json conversion - gamelist[str(app['appid'])] = app['name'] + gamelist[str(app["appid"])] = app["name"] if CACHE: - cache = Caching.writeCache('general', 'gamelist', json.dumps(gamelist)) + cache = Caching.writeCache("general", "gamelist", json.dumps(gamelist)) return gamelist def getProfiles(self, steamids): @@ -65,7 +66,7 @@ class SteamAPI(): if CACHE: cachedids = [] for steamid in steamids_copy: - cache = Caching.readCache('profile', steamid, 15) + cache = Caching.readCache("profile", steamid, 15) if cache: cachedids.append(steamid) profile[steamid] = json.loads(cache) @@ -74,20 +75,23 @@ class SteamAPI(): responses = [] while steamids_copy: - steamidlist = ','.join([str(x) for x in steamids_copy[:50]]) + steamidlist = ",".join([str(x) for x in steamids_copy[:50]]) steamids_copy = steamids_copy[50:] - url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s' % (self.token, steamidlist) + url = ( + "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s" + % (self.token, steamidlist) + ) responses.append(executor.submit(urlopen, url)) for responseF in responses: response = responseF.result() if response.status != 200: continue - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) - for player in jsondata['response']['players']: - currentid = player['steamid'] + for player in jsondata["response"]["players"]: + currentid = player["steamid"] self.sanitizePlayer(player) profile[currentid] = player @@ -96,7 +100,7 @@ class SteamAPI(): for steamid in steamids_copy: if steamid not in profile: continue - Caching.writeCache('profile', steamid, json.dumps(profile[steamid])) + Caching.writeCache("profile", steamid, json.dumps(profile[steamid])) return profile @@ -108,7 +112,7 @@ class SteamAPI(): profiles = executor.submit(self.getProfiles, steamids).result() for steamid in steamids: - profiles[steamid]['friends'] = results[steamid].result() + profiles[steamid]["friends"] = results[steamid].result() return profiles @@ -121,24 +125,33 @@ class SteamAPI(): List of steamids. TODO(andre): We lose additional information here. """ if CACHE: - cache = Caching.readCache('friends', steamid, 15*60) + cache = Caching.readCache("friends", steamid, 15 * 60) if cache: return json.loads(cache) - url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid)) + url = ( + "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend" + % (self.token, str(steamid)) + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None friends = [] - if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']: - friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']] + if ( + jsondata + and "friendslist" in jsondata + and "friends" in jsondata["friendslist"] + ): + friends = [ + friend["steamid"] for friend in jsondata["friendslist"]["friends"] + ] if CACHE: - Caching.writeCache('friends', steamid, json.dumps(friends)) + Caching.writeCache("friends", steamid, json.dumps(friends)) return friends @@ -151,24 +164,27 @@ class SteamAPI(): gameName, gameVersion, availableGameStats (Achievements/Stats) """ if CACHE: - cache = Caching.readCache('gameschema', gameid, 7*24*60*60) + cache = Caching.readCache("gameschema", gameid, 7 * 24 * 60 * 60) if cache: jsondata = json.loads(cache) - return jsondata['game'] + return jsondata["game"] - url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid)) + url = ( + "http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json" + % (self.token, str(gameid)) + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError as e: # f.e. profile is private jsondata = None - if 'game' in jsondata: + if "game" in jsondata: if CACHE: - Caching.writeCache('gameschema', gameid, json.dumps(jsondata)) - return jsondata['game'] + Caching.writeCache("gameschema", gameid, json.dumps(jsondata)) + return jsondata["game"] return None def getPlayerGames(self, steamid): @@ -180,24 +196,30 @@ class SteamAPI(): Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons]) """ if CACHE: - cache = Caching.readCache('games', steamid, 60*60) + cache = Caching.readCache("games", steamid, 60 * 60) if cache: jsondata = json.loads(cache) - return ( jsondata['response']['game_count'], jsondata['response']['games'] ) - - url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid)) + return ( + jsondata["response"]["game_count"], + jsondata["response"]["games"], + ) + + url = ( + "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json" + % (self.token, str(steamid)) + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None - if 'response' in jsondata and 'games' in jsondata['response']: + if "response" in jsondata and "games" in jsondata["response"]: if CACHE: - Caching.writeCache('games', steamid, json.dumps(jsondata)) - return ( jsondata['response']['game_count'], jsondata['response']['games'] ) + Caching.writeCache("games", steamid, json.dumps(jsondata)) + return (jsondata["response"]["game_count"], jsondata["response"]["games"]) return None def getUserstatsForGame(self, steamid, gameid): @@ -210,34 +232,45 @@ class SteamAPI(): dict() with statname and value """ if CACHE: - cache = Caching.readCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60) + cache = Caching.readCache( + "usergamestats", "%s-%s" % (str(steamid), str(gameid)), 24 * 60 * 60 + ) if cache: cachedata = json.loads(cache) return cachedata - url = 'http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid)) + url = ( + "http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s" + % (self.token, str(steamid), str(gameid)) + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) - statslist = jsondata['playerstats']['stats'] + statslist = jsondata["playerstats"]["stats"] userstats = dict() for stat in statslist: - userstats[stat] = statslist[stat]['value'] + userstats[stat] = statslist[stat]["value"] except HTTPError: # f.e. profile is private userstats = None if userstats: if CACHE: - cache = Caching.writeCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(userstats)) + cache = Caching.writeCache( + "usergamestats", + "%s-%s" % (str(steamid), str(gameid)), + json.dumps(userstats), + ) return userstats return None def getMultipleUserUserstatsForGame(self, steamids, gameid): futures = dict() for steamid in steamids: - futures[steamid] = executor.submit(self.getUserstatsForGame, steamid, gameid) + futures[steamid] = executor.submit( + self.getUserstatsForGame, steamid, gameid + ) result = dict() for steamid in steamids: @@ -254,24 +287,31 @@ class SteamAPI(): dict() with game_count and games, which contains appid + playtime_forever """ if CACHE: - cache = Caching.readCache('userownedgames', '%s' % (str(steamid)), 7*24*60*60) + cache = Caching.readCache( + "userownedgames", "%s" % (str(steamid)), 7 * 24 * 60 * 60 + ) if cache: cachedata = json.loads(cache) return cachedata - url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s' % (self.token, str(steamid)) + url = ( + "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s" + % (self.token, str(steamid)) + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) - if 'response' in jsondata: - jsondata = jsondata['response'] + if "response" in jsondata: + jsondata = jsondata["response"] except HTTPError: jsondata = None if jsondata: if CACHE: - cache = Caching.writeCache('userownedgames', '%s' % (str(steamid)), json.dumps(jsondata)) + cache = Caching.writeCache( + "userownedgames", "%s" % (str(steamid)), json.dumps(jsondata) + ) return jsondata return None @@ -291,24 +331,32 @@ class SteamAPI(): """ if CACHE: - cache = Caching.readCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), 60*60) + cache = Caching.readCache( + "serverupdate", "%s-%s" % (str(gameid), str(gameversion)), 60 * 60 + ) if cache: cachedata = json.loads(cache) return cachedata - url = 'http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}'.format(gameid, gameversion) + url = "http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}".format( + gameid, gameversion + ) try: response = urlopen(url) - data = response.read().decode('utf-8') + data = response.read().decode("utf-8") jsondata = json.loads(data) - if 'response' in jsondata: - jsondata = jsondata['response'] + if "response" in jsondata: + jsondata = jsondata["response"] except HTTPError: jsondata = None if jsondata: if CACHE: - cache = Caching.writeCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), json.dumps(jsondata)) + cache = Caching.writeCache( + "serverupdate", + "%s-%s" % (str(gameid), str(gameversion)), + json.dumps(jsondata), + ) return jsondata return None @@ -326,33 +374,43 @@ class SteamAPI(): def getDataForPremadefinder(self, steamids): futures = dict() - futures['profiles'] = executor.submit(self.getProfiles, steamids) - futures['ownedGames'] = dict() - futures['userstats'] = dict() - futures['friends'] = dict() + futures["profiles"] = executor.submit(self.getProfiles, steamids) + futures["ownedGames"] = dict() + futures["userstats"] = dict() + futures["friends"] = dict() for steamid in steamids: - futures['ownedGames'][steamid] = executor.submit(self.getOwnedGames, steamid) - futures['userstats'][steamid] = executor.submit(self.getUserstatsForGame, steamid, 730) - futures['friends'][steamid] = executor.submit(self.getFriends, steamid) - - profiles = futures['profiles'].result() + futures["ownedGames"][steamid] = executor.submit( + self.getOwnedGames, steamid + ) + futures["userstats"][steamid] = executor.submit( + self.getUserstatsForGame, steamid, 730 + ) + futures["friends"][steamid] = executor.submit(self.getFriends, steamid) + + profiles = futures["profiles"].result() for steamid in profiles: - profiles[steamid]['_friends'] = futures['friends'][steamid].result() - profiles[steamid]['_userstats'] = futures['userstats'][steamid].result() - profiles[steamid]['_ownedGames'] = futures['ownedGames'][steamid].result() - profiles[steamid]['_ownedPlayedGames'] = 'n/a' - if profiles[steamid]['_ownedGames']: - profiles[steamid]['_ownedPlayedGames'] = len([game for game in profiles[steamid]['_ownedGames']['games'] if game['playtime_forever'] > 0]) + profiles[steamid]["_friends"] = futures["friends"][steamid].result() + profiles[steamid]["_userstats"] = futures["userstats"][steamid].result() + profiles[steamid]["_ownedGames"] = futures["ownedGames"][steamid].result() + profiles[steamid]["_ownedPlayedGames"] = "n/a" + if profiles[steamid]["_ownedGames"]: + profiles[steamid]["_ownedPlayedGames"] = len( + [ + game + for game in profiles[steamid]["_ownedGames"]["games"] + if game["playtime_forever"] > 0 + ] + ) return profiles @staticmethod def sanitizePlayer(player): - if 'lastlogoff' not in player: - player['lastlogoff'] = -1 + if "lastlogoff" not in player: + player["lastlogoff"] = -1 + if __name__ == "__main__": # TODO(andre): Maybe run tests here? - print('This is a module.') - + print("This is a module.") -- cgit v1.2.3