summaryrefslogtreecommitdiff
path: root/SteamAPI.py
diff options
context:
space:
mode:
authorAndré Glüpker <git@wgmd.de>2017-10-01 14:50:19 +0200
committerAndré Glüpker <git@wgmd.de>2017-10-01 14:50:19 +0200
commit1585395e7e530665c565b88fea15cbea68d3a88d (patch)
tree204033b68d01e27036a68d625f4474f3254880c2 /SteamAPI.py
downloadsteam-1585395e7e530665c565b88fea15cbea68d3a88d.tar.gz
steam-1585395e7e530665c565b88fea15cbea68d3a88d.tar.bz2
steam-1585395e7e530665c565b88fea15cbea68d3a88d.zip
Initial public release
Diffstat (limited to 'SteamAPI.py')
-rw-r--r--SteamAPI.py209
1 files changed, 209 insertions, 0 deletions
diff --git a/SteamAPI.py b/SteamAPI.py
new file mode 100644
index 0000000..580b375
--- /dev/null
+++ b/SteamAPI.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python3
+
+from concurrent.futures import ThreadPoolExecutor
+from urllib.error import HTTPError
+from urllib.error import URLError
+from urllib.request import urlopen
+import json
+from Caching import Caching
+
+##################################################
+## Settings
+##################################################
+# Directory for a local cache & time to live for cache items in seconds
+CACHE = True
+
+##################################################
+
+class SteamAPI():
+ def __init__(self, token):
+ self.token = token
+
+ def getProfiles(self, steamids):
+ """Get steam profiles.
+
+ Args:
+ steamids: Steamids to fetch profiles for
+ Returns:
+ dict() with str(steamid) as keys
+ """
+ profile = dict()
+ steamids_copy = [str(steamid) for steamid in steamids]
+
+ if CACHE:
+ cachedids = []
+ for steamid in steamids_copy:
+ cache = Caching.readCache('profile', steamid, 15)
+ if cache:
+ cachedids.append(steamid)
+ profile[steamid] = json.loads(cache)
+ for steamid in cachedids:
+ steamids_copy.remove(steamid)
+
+ if len(steamids_copy):
+ steamidlist = ','.join([str(x) for x in steamids_copy])
+ url = 'https://api.steampowered.com/ISteamUser/GetPlayerSummaries/v0002/?format=json&key=%s&steamids=%s' % (self.token, steamidlist)
+ # print(url)
+ response = urlopen(url)
+ if response.status != 200:
+ print('Isn\'t this an HTTPError?')
+ return []
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+
+ for player in jsondata['response']['players']:
+ currentid = player['steamid']
+ profile[currentid] = player
+
+ if CACHE:
+ # save newly loaded profiles
+ for steamid in steamids_copy:
+ Caching.writeCache('profile', steamid, json.dumps(profile[steamid]))
+
+ return profile
+
+ def getMultipleFriends(self, steamids):
+ executor = ThreadPoolExecutor(max_workers=10)
+ # Ask steam about friends for each
+ results = dict()
+ for steamid in steamids:
+ results[steamid] = executor.submit(self.getFriends, steamid)
+
+ profiles = executor.submit(self.getProfiles, steamids).result()
+
+ for steamid in steamids:
+ profiles[steamid]['friends'] = results[steamid].result()
+
+ return profiles
+
+ def getFriends(self, steamid):
+ """Fetch steam friends for given steamid.
+
+ Args:
+ steamid: Steamid, whose friendslist to fetch
+ Returns:
+ List of steamids. TODO(andre): We lose additional information here.
+ """
+ if CACHE:
+ cache = Caching.readCache('friends', steamid, 15*60)
+ if cache:
+ return json.loads(cache)
+
+ url = 'https://api.steampowered.com/ISteamUser/GetFriendList/v0001/?key=%s&steamid=%s&relationship=friend' % (self.token, str(steamid))
+ # print(url)
+ try:
+ response = urlopen(url)
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+ except HTTPError:
+ # f.e. profile is private
+ jsondata = None
+
+ friends = []
+ if jsondata and 'friendslist' in jsondata and 'friends' in jsondata['friendslist']:
+ friends = [friend['steamid'] for friend in jsondata['friendslist']['friends']]
+ if CACHE:
+ Caching.writeCache('friends', steamid, json.dumps(friends))
+
+ return friends
+
+ def getGameSchema(self, gameid):
+ """Fetch info about a game.
+
+ Args:
+ gameid: Appid of the game
+ Returns:
+ gameName, gameVersion, availableGameStats (Achievements/Stats)
+ """
+ if CACHE:
+ cache = Caching.readCache('gameschema', gameid, 7*24*60*60)
+ if cache:
+ jsondata = json.loads(cache)
+ return jsondata['game']
+
+ url = 'http://api.steampowered.com/ISteamUserStats/GetSchemaForGame/v2/?key=%s&appid=%s&format=json' % (self.token, str(gameid))
+ print(url)
+ try:
+ response = urlopen(url)
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+ except HTTPError as e:
+ # f.e. profile is private
+ print('Fetch failed.', e.code, e.reason)
+ jsondata = None
+
+ if 'game' in jsondata:
+ if CACHE:
+ Caching.writeCache('gameschema', gameid, json.dumps(jsondata))
+ return jsondata['game']
+ else:
+ print('No game in json:', jsondata)
+ return None
+
+ def getGames(self, steamid):
+ """Fetch a list of games a person possesses.
+
+ Args:
+ steamid: Steamid, whose gamelist to fetch
+ Returns:
+ Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons])
+ """
+ if CACHE:
+ cache = Caching.readCache('games', steamid, 60*60)
+ if cache:
+ jsondata = json.loads(cache)
+ return ( jsondata['response']['game_count'], jsondata['response']['games'] )
+
+ url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s&include_appinfo=1&format=json' % (self.token, str(steamid))
+ print(url)
+ try:
+ response = urlopen(url)
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+ except HTTPError:
+ # f.e. profile is private
+ jsondata = None
+
+ if 'response' in jsondata and 'games' in jsondata['response']:
+ if CACHE:
+ Caching.writeCache('games', steamid, json.dumps(jsondata))
+ return ( jsondata['response']['game_count'], jsondata['response']['games'] )
+ return None
+
+ def getPlayerGameStats(self, steamid, gameid):
+ """Fetch the list of achievements a person achieved in a game
+
+ Args:
+ steamid: Steamid of the particular user
+ gameid: Appid of the game we want to check
+ Returns:
+ Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons])
+ """
+ if CACHE:
+ cache = Caching.readCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60)
+ if cache:
+ jsondata = json.loads(cache)
+ return jsondata
+ else:
+ print('nocache')
+
+ url = 'http://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid))
+ print(url)
+ try:
+ response = urlopen(url)
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+ except HTTPError:
+ # f.e. profile is private
+ jsondata = None
+
+ if 'playerstats' in jsondata:
+ if CACHE:
+ cache = Caching.writeCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(jsondata))
+ return jsondata
+ return None
+
+if __name__ == "__main__":
+ # TODO(andre): Maybe run tests here?
+ print('This is a module.')
+