#!/usr/bin/env python3 from concurrent.futures import ThreadPoolExecutor from urllib.error import HTTPError from urllib.error import URLError from urllib.request import urlopen import json from Caching import Caching ################################################## ## Settings ################################################## # Directory for a local cache & time to live for cache items in seconds CACHE = True ################################################## class SteamAPI(): def __init__(self, token): self.token = token def getGames(self): """Get list of steam games by app id. Returns: dict() with str(appid) as keys, str gamename as value """ if CACHE: cache = Caching.readCache('general', 'gamelist', 7*24*60*60) if cache: return json.loads(cache) url = 'https://api.steampowered.com/ISteamApps/GetAppList/v2' try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) applist = jsondata['applist']['apps'] except KeyError: return None except HTTPError: return None gamelist = dict() for app in applist: # str keys for json conversion gamelist[str(app['appid'])] = app['name'] if CACHE: cache = Caching.writeCache('general', 'gamelist', json.dumps(gamelist)) return gamelist def getProfiles(self, steamids): """Get steam profiles. Args: steamids: Steamids to fetch profiles for Returns: dict() with str(steamid) as keys """ profile = dict() steamids_copy = [str(steamid) for steamid in steamids] if CACHE: cachedids = [] for steamid in steamids_copy: cache = Caching.readCache('profile', steamid, 15) if cache: cachedids.append(steamid) profile[steamid] = json.loads(cache) for steamid in cachedids: steamids_copy.remove(steamid) if steamids_copy: steamidlist = ','.join([str(x) for x in steamids_copy]) url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v2/?format=json&key=%s&steamids=%s' % (self.token, steamidlist) response = urlopen(url) if response.status != 200: return [] data = response.read().decode('utf-8') jsondata = json.loads(data) for player in jsondata['response']['players']: currentid = player['steamid'] profile[currentid] = player if CACHE: # save newly loaded profiles for steamid in steamids_copy: Caching.writeCache('profile', steamid, json.dumps(profile[steamid])) return profile def getMultipleFriends(self, steamids): executor = ThreadPoolExecutor(max_workers=10) # Ask steam about friends for each results = dict() for steamid in steamids: results[steamid] = executor.submit(self.getFriends, steamid) profiles = executor.submit(self.getProfiles, steamids).result() for steamid in steamids: profiles[steamid]['friends'] = results[steamid].result() return profiles def getFriends(self, steamid): """Fetch steam friends for given steamid. Args: steamid: Steamid, whose friendslist to fetch Returns: List of steamids. TODO(andre): We lose additional information here. """ if CACHE: cache = Caching.readCache('friends', steamid, 15*60) if cache: return json.loads(cache) url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid)) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None friends = [] if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']: friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']] if CACHE: Caching.writeCache('friends', steamid, json.dumps(friends)) return friends def getGameSchema(self, gameid): """Fetch info about a game. Args: gameid: Appid of the game Returns: gameName, gameVersion, availableGameStats (Achievements/Stats) """ if CACHE: cache = Caching.readCache('gameschema', gameid, 7*24*60*60) if cache: jsondata = json.loads(cache) return jsondata['game'] url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid)) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) except HTTPError as e: # f.e. profile is private jsondata = None if 'game' in jsondata: if CACHE: Caching.writeCache('gameschema', gameid, json.dumps(jsondata)) return jsondata['game'] return None def getPlayerGames(self, steamid): """Fetch a list of games a person possesses. Args: steamid: Steamid, whose gamelist to fetch Returns: Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons]) """ if CACHE: cache = Caching.readCache('games', steamid, 60*60) if cache: jsondata = json.loads(cache) return ( jsondata['response']['game_count'], jsondata['response']['games'] ) url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid)) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) except HTTPError: # f.e. profile is private jsondata = None if 'response' in jsondata and 'games' in jsondata['response']: if CACHE: Caching.writeCache('games', steamid, json.dumps(jsondata)) return ( jsondata['response']['game_count'], jsondata['response']['games'] ) return None def getUserstatsForGame(self, steamid, gameid): """Fetch the available game stats for a player in the specified game. Args: steamid: Steamid of the particular user gameid: Appid of the game we want to check Returns: dict() with statname and value """ if CACHE: cache = Caching.readCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60) if cache: cachedata = json.loads(cache) return cachedata url = 'http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid)) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) statslist = jsondata['playerstats']['stats'] userstats = dict() for stat in statslist: userstats[stat] = statslist[stat]['value'] except HTTPError: # f.e. profile is private userstats = None if userstats: if CACHE: cache = Caching.writeCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(userstats)) return userstats return None def getMultipleUserUserstatsForGame(self, steamids, gameid): executor = ThreadPoolExecutor(max_workers=10) futures = dict() for steamid in steamids: futures[steamid] = executor.submit(self.getUserstatsForGame, steamid, gameid) result = dict() for steamid in steamids: result[steamid] = futures[steamid].result() return result def getOwnedGames(self, steamid): """Fetch games owned by a player. Args: steamid: Steamid of the particular user Returns: dict() with game_count and games, which contains appid + playtime_forever """ if CACHE: cache = Caching.readCache('userownedgames', '%s' % (str(steamid)), 7*24*60*60) if cache: cachedata = json.loads(cache) return cachedata url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s' % (self.token, str(steamid)) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) if 'response' in jsondata: jsondata = jsondata['response'] except HTTPError: jsondata = None if jsondata: if CACHE: cache = Caching.writeCache('userownedgames', '%s' % (str(steamid)), json.dumps(jsondata)) return jsondata return None def getGameUpdateState(self, gameid, gameversion): """Fetch the current game update state. Args: gameid: Steam game id gameversion: Current version of the game Returns: dict() with the following keys: - success Boolean Was able to check version - up_to_date Boolean If the game is up to date - version_is_listable Boolean If the game version is listed - required_version (int) ? New version ? - message (String) Message to display """ if CACHE: cache = Caching.readCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), 60*60) if cache: cachedata = json.loads(cache) return cachedata url = 'http://api.steampowered.com/ISteamApps/UpToDateCheck/v1?appId={0}&version={1}'.format(gameid, gameversion) try: response = urlopen(url) data = response.read().decode('utf-8') jsondata = json.loads(data) if 'response' in jsondata: jsondata = jsondata['response'] except HTTPError: jsondata = None if jsondata: if CACHE: cache = Caching.writeCache('serverupdate', '%s-%s' % (str(gameid), str(gameversion)), json.dumps(jsondata)) return jsondata return None def getMultipleUserOwnedGames(self, steamids): executor = ThreadPoolExecutor(max_workers=10) futures = dict() for steamid in steamids: futures[steamid] = executor.submit(self.getOwnedGames, steamid) result = dict() for steamid in steamids: result[steamid] = futures[steamid].result() return result def getDataForPremadefinder(self, steamids): executor = ThreadPoolExecutor(max_workers=20) futures = dict() futures['profiles'] = executor.submit(self.getProfiles, steamids) futures['ownedGames'] = dict() futures['userstats'] = dict() futures['friends'] = dict() for steamid in steamids: futures['ownedGames'][steamid] = executor.submit(self.getOwnedGames, steamid) futures['userstats'][steamid] = executor.submit(self.getUserstatsForGame, steamid, 730) futures['friends'][steamid] = executor.submit(self.getFriends, steamid) profiles = futures['profiles'].result() for steamid in profiles: profiles[steamid]['_friends'] = futures['friends'][steamid].result() profiles[steamid]['_userstats'] = futures['userstats'][steamid].result() profiles[steamid]['_ownedGames'] = futures['ownedGames'][steamid].result() profiles[steamid]['_ownedPlayedGames'] = len([game for game in profiles[steamid]['_ownedGames']['games'] if game['playtime_forever'] > 0]) return profiles if __name__ == "__main__": # TODO(andre): Maybe run tests here? print('This is a module.')