#!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor from urllib.error import HTTPError from urllib.error import URLError from urllib.request import urlopen import json from Caching import Caching ################################################## ## Settings ################################################## # Directory for a local cache & time to live for cache items in seconds CACHE = True ################################################## executor = ThreadPoolExecutor(max_workers=25) class SteamAPI: def __init__(self, token): self.token = token def getGames(self): """Get list of steam games by app id. Returns: dict() with str(appid) as keys, str gamename as value """ if CACHE: cache = Caching.readCache("general", "gamelist", 7 * 24 * 60 * 60) if cache: return json.loads(cache) url = "https://api.steampowered.com/ISteamApps/GetAppList/v2" try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) applist = jsondata["applist"]["apps"] except KeyError: return None except HTTPError: return None gamelist = dict() for app in applist: # str keys for json conversion gamelist[str(app["appid"])] = app["name"] if CACHE: cache = Caching.writeCache("general", "gamelist", json.dumps(gamelist)) return gamelist def getProfiles(self, steamids): """Get steam profiles. Args: steamids: Steamids to fetch profiles for Returns: dict() with str(steamid) as keys """ profile = dict() steamids_copy = [str(steamid) for steamid in steamids] if CACHE: cachedids = [] for steamid in steamids_copy: cache = Caching.readCache("profile", steamid, 15) if cache: cachedids.append(steamid) profile[steamid] = json.loads(cache) for steamid in cachedids: steamids_copy.remove(steamid) responses = [] while steamids_copy: steamidlist = ",".join([str(x) for x in steamids_copy[:50]]) steamids_copy = steamids_copy[50:] url = ( "https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s" % (self.token, steamidlist) ) responses.append(executor.submit(urlopen, url)) for responseF in responses: response = responseF.result() if response.status != 200: continue data = response.read().decode("utf-8") jsondata = json.loads(data) for player in jsondata["response"]["players"]: currentid = player["steamid"] self.sanitizePlayer(player) profile[currentid] = player if CACHE: # save newly loaded profiles for steamid in steamids_copy: if steamid not in profile: continue Caching.writeCache("profile", steamid, json.dumps(profile[steamid])) return profile def getMultipleFriends(self, steamids): results = dict() for steamid in steamids: results[steamid] = executor.submit(self.getFriends, steamid) profiles = executor.submit(self.getProfiles, steamids).result() for steamid in steamids: profiles[steamid]["friends"] = results[steamid].result() return profiles def getFriends(self, steamid): """Fetch steam friends for given steamid. Args: steamid: Steamid, whose friendslist to fetch Returns: List of steamids. TODO(andre): We lose additional information here. """ if CACHE: cache = Caching.readCache("friends", steamid, 15 * 60) if cache: return json.loads(cache) url = ( "https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend" % (self.token, str(steamid)) ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None friends = [] if ( jsondata and "friendslist" in jsondata and "friends" in jsondata["friendslist"] ): friends = [ friend["steamid"] for friend in jsondata["friendslist"]["friends"] ] if CACHE: Caching.writeCache("friends", steamid, json.dumps(friends)) return friends def getGameSchema(self, gameid): """Fetch info about a game. Args: gameid: Appid of the game Returns: gameName, gameVersion, availableGameStats (Achievements/Stats) """ if CACHE: cache = Caching.readCache("gameschema", gameid, 7 * 24 * 60 * 60) if cache: jsondata = json.loads(cache) return jsondata["game"] url = ( "http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json" % (self.token, str(gameid)) ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError as e: # f.e. profile is private jsondata = None if "game" in jsondata: if CACHE: Caching.writeCache("gameschema", gameid, json.dumps(jsondata)) return jsondata["game"] return None def getPlayerGames(self, steamid): """Fetch a list of games a person possesses. Args: steamid: Steamid, whose gamelist to fetch Returns: Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons]) """ if CACHE: cache = Caching.readCache("games", steamid, 60 * 60) if cache: jsondata = json.loads(cache) return ( jsondata["response"]["game_count"], jsondata["response"]["games"], ) url = ( "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json" % (self.token, str(steamid)) ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None if "response" in jsondata and "games" in jsondata["response"]: if CACHE: Caching.writeCache("games", steamid, json.dumps(jsondata)) return (jsondata["response"]["game_count"], jsondata["response"]["games"]) return None def getUserstatsForGame(self, steamid, gameid): """Fetch the available game stats for a player in the specified game. Args: steamid: Steamid of the particular user gameid: Appid of the game we want to check Returns: dict() with statname and value """ if CACHE: cache = Caching.readCache( "usergamestats", "%s-%s" % (str(steamid), str(gameid)), 24 * 60 * 60 ) if cache: cachedata = json.loads(cache) return cachedata url = ( "http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s" % (self.token, str(steamid), str(gameid)) ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) statslist = jsondata["playerstats"]["stats"] userstats = dict() for stat in statslist: userstats[stat] = statslist[stat]["value"] except HTTPError: # f.e. profile is private userstats = None if userstats: if CACHE: cache = Caching.writeCache( "usergamestats", "%s-%s" % (str(steamid), str(gameid)), json.dumps(userstats), ) return userstats return None def getMultipleUserUserstatsForGame(self, steamids, gameid): futures = dict() for steamid in steamids: futures[steamid] = executor.submit( self.getUserstatsForGame, steamid, gameid ) result = dict() for steamid in steamids: result[steamid] = futures[steamid].result() return result def getOwnedGames(self, steamid): """Fetch games owned by a player. Args: steamid: Steamid of the particular user Returns: dict() with game_count and games, which contains appid + playtime_forever """ if CACHE: cache = Caching.readCache( "userownedgames", "%s" % (str(steamid)), 7 * 24 * 60 * 60 ) if cache: cachedata = json.loads(cache) return cachedata url = ( "http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s" % (self.token, str(steamid)) ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) if "response" in jsondata: jsondata = jsondata["response"] except HTTPError: jsondata = None if jsondata: if CACHE: cache = Caching.writeCache( "userownedgames", "%s" % (str(steamid)), json.dumps(jsondata) ) return jsondata return None def getGameUpdateState(self, gameid, gameversion): """Fetch the current game update state. Args: gameid: Steam game id gameversion: Current version of the game Returns: dict() with the following keys: - success Boolean Was able to check version - up_to_date Boolean If the game is up to date - version_is_listable Boolean If the game version is listed - required_version (int) ? New version ? - message (String) Message to display """ if CACHE: cache = Caching.readCache( "serverupdate", "%s-%s" % (str(gameid), str(gameversion)), 60 * 60 ) if cache: cachedata = json.loads(cache) return cachedata url = "http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}".format( gameid, gameversion ) try: response = urlopen(url) data = response.read().decode("utf-8") jsondata = json.loads(data) if "response" in jsondata: jsondata = jsondata["response"] except HTTPError: jsondata = None if jsondata: if CACHE: cache = Caching.writeCache( "serverupdate", "%s-%s" % (str(gameid), str(gameversion)), json.dumps(jsondata), ) return jsondata return None def getMultipleUserOwnedGames(self, steamids): futures = dict() for steamid in steamids: futures[steamid] = executor.submit(self.getOwnedGames, steamid) result = dict() for steamid in steamids: result[steamid] = futures[steamid].result() return result def getDataForPrematefinder(self, steamids): futures = dict() futures["profiles"] = executor.submit(self.getProfiles, steamids) futures["ownedGames"] = dict() futures["userstats"] = dict() futures["friends"] = dict() for steamid in steamids: futures["ownedGames"][steamid] = executor.submit( self.getOwnedGames, steamid ) futures["userstats"][steamid] = executor.submit( self.getUserstatsForGame, steamid, 730 ) futures["friends"][steamid] = executor.submit(self.getFriends, steamid) profiles = futures["profiles"].result() for steamid in profiles: profiles[steamid]["_friends"] = futures["friends"][steamid].result() profiles[steamid]["_userstats"] = futures["userstats"][steamid].result() profiles[steamid]["_ownedGames"] = futures["ownedGames"][steamid].result() profiles[steamid]["_ownedPlayedGames"] = "n/a" if profiles[steamid]["_ownedGames"]: profiles[steamid]["_ownedPlayedGames"] = len( [ game for game in profiles[steamid]["_ownedGames"]["games"] if game["playtime_forever"] > 0 ] ) return profiles @staticmethod def sanitizePlayer(player): if "lastlogoff" not in player: player["lastlogoff"] = -1 if __name__ == "__main__": # TODO(andre): Maybe run tests here? print("This is a module.")