summaryrefslogtreecommitdiff
path: root/SteamAPI.py
diff options
context:
space:
mode:
authorAndré Glüpker <git@wgmd.de>2018-01-26 00:47:54 +0100
committerAndré Glüpker <git@wgmd.de>2018-01-26 00:47:54 +0100
commit98a97bc04365db9becb1038f88e10866a829b280 (patch)
tree9124502891e384f406393651c07be36c721d2c21 /SteamAPI.py
parent81cfe943b265e8b9d41ad7d150cca92a537d7d2c (diff)
downloadsteam-98a97bc04365db9becb1038f88e10866a829b280.tar.gz
steam-98a97bc04365db9becb1038f88e10866a829b280.tar.bz2
steam-98a97bc04365db9becb1038f88e10866a829b280.zip
Ask SteamAPI for owned games and gamestats in CSGO.
This is used in Premade Finder to maybe identify Smurfs.
Diffstat (limited to 'SteamAPI.py')
-rw-r--r--SteamAPI.py100
1 files changed, 91 insertions, 9 deletions
diff --git a/SteamAPI.py b/SteamAPI.py
index 87b5c39..1790989 100644
--- a/SteamAPI.py
+++ b/SteamAPI.py
@@ -193,36 +193,118 @@ class SteamAPI():
return ( jsondata['response']['game_count'], jsondata['response']['games'] )
return None
- def getPlayerGameStats(self, steamid, gameid):
- """Fetch the list of achievements a person achieved in a game
+ def getUserstatsForGame(self, steamid, gameid):
+ """Fetch the available game stats for a player in the specified game.
Args:
steamid: Steamid of the particular user
gameid: Appid of the game we want to check
Returns:
- Tuple with (number of games, gameinfo [appid, name, playtime_2weeks, playtime_forever, icons])
+ dict() with statname and value
"""
if CACHE:
- cache = Caching.readCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60)
+ cache = Caching.readCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), 24*60*60)
if cache:
- jsondata = json.loads(cache)
- return jsondata
+ cachedata = json.loads(cache)
+ return cachedata
- url = 'http://api.steampowered.com/ISteamUserStats/GetPlayerAchievements/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid))
+ url = 'http://api.steampowered.com/ISteamUserStats/GetUserStatsForGame/v0001/?key=%s&steamid=%s&appid=%s' % (self.token, str(steamid), str(gameid))
try:
response = urlopen(url)
data = response.read().decode('utf-8')
jsondata = json.loads(data)
+ statslist = jsondata['playerstats']['stats']
+ userstats = dict()
+ for stat in statslist:
+ userstats[stat] = statslist[stat]['value']
except HTTPError:
# f.e. profile is private
+ userstats = None
+
+ if userstats:
+ if CACHE:
+ cache = Caching.writeCache('usergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(userstats))
+ return userstats
+ return None
+
+ def getMultipleUserUserstatsForGame(self, steamids, gameid):
+ executor = ThreadPoolExecutor(max_workers=10)
+ futures = dict()
+ for steamid in steamids:
+ futures[steamid] = executor.submit(self.getUserstatsForGame, steamid, gameid)
+
+ result = dict()
+ for steamid in steamids:
+ result[steamid] = futures[steamid].result()
+
+ return result
+
+ def getOwnedGames(self, steamid):
+ """Fetch games owned by a player.
+
+ Args:
+ steamid: Steamid of the particular user
+ Returns:
+ dict() with game_count and games, which contains appid + playtime_forever
+ """
+ if CACHE:
+ cache = Caching.readCache('userownedgames', '%s' % (str(steamid)), 7*24*60*60)
+ if cache:
+ cachedata = json.loads(cache)
+ return cachedata
+
+ url = 'http://api.steampowered.com/IPlayerService/GetOwnedGames/v0001/?key=%s&steamid=%s' % (self.token, str(steamid))
+ try:
+ response = urlopen(url)
+ data = response.read().decode('utf-8')
+ jsondata = json.loads(data)
+ if 'response' in jsondata:
+ jsondata = jsondata['response']
+ except HTTPError:
jsondata = None
- if 'playerstats' in jsondata:
+ if jsondata:
if CACHE:
- cache = Caching.writeCache('playergamestats', '%s-%s' % (str(steamid), str(gameid)), json.dumps(jsondata))
+ cache = Caching.writeCache('userownedgames', '%s' % (str(steamid)), json.dumps(jsondata))
return jsondata
return None
+ def getMultipleUserOwnedGames(self, steamids):
+ executor = ThreadPoolExecutor(max_workers=10)
+ futures = dict()
+ for steamid in steamids:
+ futures[steamid] = executor.submit(self.getOwnedGames, steamid)
+
+ result = dict()
+ for steamid in steamids:
+ result[steamid] = futures[steamid].result()
+
+ return result
+
+ def getDataForPremadefinder(self, steamids):
+ executor = ThreadPoolExecutor(max_workers=20)
+
+ futures = dict()
+ futures['profiles'] = executor.submit(self.getProfiles, steamids)
+ futures['ownedGames'] = dict()
+ futures['userstats'] = dict()
+ futures['friends'] = dict()
+
+ for steamid in steamids:
+ futures['ownedGames'][steamid] = executor.submit(self.getOwnedGames, steamid)
+ futures['userstats'][steamid] = executor.submit(self.getUserstatsForGame, steamid, 730)
+ futures['friends'][steamid] = executor.submit(self.getFriends, steamid)
+
+ profiles = futures['profiles'].result()
+ for steamid in profiles:
+ profiles[steamid]['_friends'] = futures['friends'][steamid].result()
+ profiles[steamid]['_userstats'] = futures['userstats'][steamid].result()
+ profiles[steamid]['_ownedGames'] = futures['ownedGames'][steamid].result()
+ profiles[steamid]['_ownedPlayedGames'] = len([game for game in profiles[steamid]['_ownedGames']['games'] if game['playtime_forever'] > 0])
+
+
+ return profiles
+
if __name__ == "__main__":
# TODO(andre): Maybe run tests here?
print('This is a module.')