diff options
author | André Glüpker <git@wgmd.de> | 2017-10-01 14:50:19 +0200 |
---|---|---|
committer | André Glüpker <git@wgmd.de> | 2017-10-01 14:50:19 +0200 |
commit | 1585395e7e530665c565b88fea15cbea68d3a88d (patch) | |
tree | 204033b68d01e27036a68d625f4474f3254880c2 /SteamAPI.py | |
download | steam-1585395e7e530665c565b88fea15cbea68d3a88d.tar.gz steam-1585395e7e530665c565b88fea15cbea68d3a88d.tar.bz2 steam-1585395e7e530665c565b88fea15cbea68d3a88d.zip |
Initial public release
Diffstat (limited to 'SteamAPI.py')
-rw-r--r-- | SteamAPI.py | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/SteamAPI.py b/SteamAPI.py new file mode 100644 index 0000000..580b375 --- /dev/null +++ b/SteamAPI.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python3 + +from concurrent.futures import ThreadPoolExecutor +from urllib.error import HTTPError +from urllib.error import URLError +from urllib.request import urlopen +import json +from Caching import Caching + +################################################## +## Settings +################################################## +# Directory for a local cache & time to live for cache items in seconds +CACHE = True + +################################################## + +class SteamAPI(): + def __init__(self, token): + self.token = token + + def getProfiles(self, steamids): + """Get steam profiles. + + Args: + steamids: Steamids to fetch profiles for + Returns: + dict() with str(steamid) as keys + """ + profile = dict() + steamids_copy = [str(steamid) for steamid in steamids] + + if CACHE: + cachedids = [] + for steamid in steamids_copy: + cache = Caching.readCache('profile', steamid, 15) + if cache: + cachedids.append(steamid) + profile[steamid] = json.loads(cache) + for steamid in cachedids: + steamids_copy.remove(steamid) + + if len(steamids_copy): + steamidlist = ','.join([str(x) for x in steamids_copy]) + url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?format=json&key=%s&steamids=%s' % (self.token, steamidlist) + # print(url) + response = urlopen(url) + if response.status != 200: + print('Isn\'t this an HTTPError?') + return [] + data = response.read().decode('utf-8') + jsondata = json.loads(data) + + for player in jsondata['response']['players']: + currentid = player['steamid'] + profile[currentid] = player + + if CACHE: + # save newly loaded profiles + for steamid in steamids_copy: + Caching.writeCache('profile', steamid, json.dumps(profile[steamid])) + + return profile + + def getMultipleFriends(self, steamids): + executor = ThreadPoolExecutor(max_workers=10) + # Ask steam about friends for each + results = dict() + for steamid in steamids: + results[steamid] = executor.submit(self.getFriends, steamid) + + profiles = executor.submit(self.getProfiles, steamids).result() + + for steamid in steamids: + profiles[steamid]['friends'] = results[steamid].result() + + return profiles + + def getFriends(self, steamid): + """Fetch steam friends for given steamid. + + Args: + steamid: Steamid, whose friendslist to fetch + Returns: + List of steamids. TODO(andre): We lose additional information here. + """ + if CACHE: + cache = Caching.readCache('friends', steamid, 15*60) + if cache: + return json.loads(cache) + + url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid)) + # print(url) + try: + response = urlopen(url) + data = response.read().decode('utf-8') + jsondata = json.loads(data) + except HTTPError: + # f.e. profile is private + jsondata = None + + friends = [] + if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']: + friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']] + if CACHE: + Caching.writeCache('friends', steamid, json.dumps(friends)) + + return friends + + def getGameSchema(self, gameid): + """Fetch info about a game. + + Args: + gameid: Appid of the game + Returns: + gameName, gameVersion, availableGameStats (Achievements/Stats) + """ + if CACHE: + cache = Caching.readCache('gameschema', gameid, 7*24*60*60) + if cache: + jsondata = json.loads(cache) + return jsondata['game'] + + url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid)) + print(url) + try: + response = urlopen(url) + data = response.read().decode('utf-8') + jsondata = json.loads(data) + except HTTPError as e: + # f.e. profile is private + print('Fetch failed.', e.code, e.reason) + jsondata = None + + if 'game' in jsondata: + if CACHE: + Caching.writeCache('gameschema', gameid, json.dumps(jsondata)) + return jsondata['game'] + else: + print('No game in json:', jsondata) + return None + + def getGames(self, steamid): + """Fetch a list of games a person possesses. + + Args: + steamid: Steamid, whose gamelist to fetch + Returns: + Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons]) + """ + if CACHE: + cache = Caching.readCache('games', steamid, 60*60) + if cache: + jsondata = json.loads(cache) + return ( jsondata['response']['game_count'], jsondata['response']['games'] ) + + url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid)) + print(url) + try: + response = urlopen(url) + data = response.read().decode('utf-8') + jsondata = json.loads(data) + except HTTPError: + # f.e. profile is private + jsondata = None + + if 'response' in jsondata and 'games' in jsondata['response']: + if CACHE: + Caching.writeCache('games', steamid, json.dumps(jsondata)) + return ( jsondata['response']['game_count'], jsondata['response']['games'] ) + return None + + def getPlayerGameStats(self, steamid, gameid): + """Fetch the list of achievements a person achieved in a game + + Args: + steamid: Steamid of the particular user + gameid: Appid of the game we want to check + Returns: + Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons]) + """ + if CACHE: + cache = Caching.readCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60) + if cache: + jsondata = json.loads(cache) + return jsondata + else: + print('nocache') + + url = 'http://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid)) + print(url) + try: + response = urlopen(url) + data = response.read().decode('utf-8') + jsondata = json.loads(data) + except HTTPError: + # f.e. profile is private + jsondata = None + + if 'playerstats' in jsondata: + if CACHE: + cache = Caching.writeCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(jsondata)) + return jsondata + return None + +if __name__ == "__main__": + # TODO(andre): Maybe run tests here? + print('This is a module.') + |