diff options
author | André Glüpker <git@wgmd.de> | 2021-05-05 20:09:30 +0200 |
---|---|---|
committer | André Glüpker <git@wgmd.de> | 2021-05-05 20:09:30 +0200 |
commit | 5774dbfb2caa42cb55bafab98a40e47f395e44d9 (patch) | |
tree | 8294b7b6fefebc1befeed4104f3b5604683999a8 /twitter.py | |
download | rss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.tar.gz rss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.tar.bz2 rss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.zip |
Initial commit of RSS converter application
Diffstat (limited to 'twitter.py')
-rwxr-xr-x | twitter.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/twitter.py b/twitter.py new file mode 100755 index 0000000..5ddf8ad --- /dev/null +++ b/twitter.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 + +from urllib.error import HTTPError +from urllib.request import urlopen, Request +import logging + +# from requests_oauthlib import OAuth1Session +from datetime import datetime +import sys +import json + +bearer = None + + +def _format_date(dt): + """convert a datetime into an RFC 822 formatted date + Input date must be in GMT. + Stolen from PyRSS2Gen. + """ + # Looks like: + # Sat, 07 Sep 2002 00:00:01 GMT + # Can't use strftime because that's locale dependent + # + # Isn't there a standard way to do this for Python? The + # rfc822 and email.Utils modules assume a timestamp. The + # following is based on the rfc822 module. + return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( + ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], + dt.day, + [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + ][dt.month - 1], + dt.year, + dt.hour, + dt.minute, + dt.second, + ) + + +def getBearer(): + global bearer + if bearer: + return bearer + headers = { + "Authorization": "Basic Zzl1MXI2SFpYTXg0SXU5UGs5VlNvTzFUdzpmeTIyQjN2QVRRNUI2eGthb1BFdFFRUmtuUGQ1WGZBbnBKVG5hc0ZRa3NyUm5qaVNsaw==", + "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", + } + data = b"grant_type=client_credentials" + url = "https://api.twitter.com/oauth2/token" + + res = urlopen(Request(url, headers=headers, data=data, method="POST")) + response = json.loads(res.read().decode("UTF-8")) + bearer = response["access_token"] + + return bearer + + +def unshorten_urls(title, description, urls): + for url in urls: + shorted_url = url["url"] + long_url = url["expanded_url"] + + if "images" in url: + img = url["images"][0]["url"] + long_url_html = '<a href="' + long_url + '"><img src="' + img + '"/></a>' + else: + long_url_html = '<a href="' + long_url + '">' + long_url + "</a>" + + description = description.replace(shorted_url, long_url_html) + title = title.replace(shorted_url, long_url) + return title, description + + +def twitter(user): + # 500.000 Tweets per month + # API KEY = g9u1r6HZXMx4Iu9Pk9VSoO1Tw + # API SECRET KEY = fy22B3vATQ5B6xkaoPEtQQRknPd5XfAnpJTnasFQksrRnjiSlk + + headers = {"authorization": "Bearer " + getBearer()} + + # Recent = last 7 days + url = ( + "https://api.twitter.com/2/tweets/search/recent?query=from:" + + user + + "&tweet.fields=created_at,author_id,lang,source,public_metrics,entities&expansions=referenced_tweets.id,attachments.media_keys&media.fields=url" + ) + + try: + res = urlopen(Request(url, headers=headers)) + response = json.loads(res.read().decode("UTF-8")) + except Exception as exc: + logging.error('Request to twitter failed.', exc_info=exc) + return None + + if not response["meta"]["result_count"]: + return [] + + for tweet in response["data"]: + title = tweet["text"] + description = tweet["text"] + link = "https://twitter.com/" + user + "/status/" + str(tweet["id"]) + + # Check included tweets + if ( + "referenced_tweets" in tweet + and len(tweet["referenced_tweets"]) == 1 + and tweet["referenced_tweets"][0]["type"] == "retweeted" + ): + rt_info = title[: title.index(":") + 2] + ref_id = tweet["referenced_tweets"][0]["id"] + ref_tweet = next( + t for t in response["includes"]["tweets"] if t["id"] == ref_id + ) + title = rt_info + ref_tweet["text"] + description = rt_info + ref_tweet["text"] + title, description = unshorten_urls( + title, description, ref_tweet.get("entities", {}).get("urls", []) + ) + + title, description = unshorten_urls( + title, description, tweet.get("entities", {}).get("urls", []) + ) + + # Attach media + enclosures = [] + medias = tweet.get('attachments', {}).get('media_keys', []) + for media in medias: + ref_media = next( + t for t in response["includes"]["media"] if t["media_key"] == media + ) + if 'url' not in ref_media: continue + if ref_media.get('type', '') == 'photo': + description += "<br/><img src=\"" + ref_media['url'] + "\" />" + else: + enclosures.append(ref_media['url']) + + # Append Retweets etc + description += "<br/><br/>" + description += str(tweet["public_metrics"]["retweet_count"]) + " Retweets, " + description += str(tweet["public_metrics"]["like_count"]) + " Likes, " + description += str(tweet["public_metrics"]["reply_count"]) + " Replies, " + description += str(tweet["public_metrics"]["quote_count"]) + " Quotes" + description += "<br/>" + description += "Source: " + tweet["source"] + + date = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + + + yield title, description, link, date, enclosures + + +def main(channel): + print( + """<?xml version="1.0" encoding="UTF-8"?> +<rss version="2.0"> + <channel> + <title>Twitter: """ + + channel + + """</title> + <link>https://twitter.com/""" + + channel + + """</link> + <description>The latest entries of the twitter account of """ + + channel + + """</description> + <lastBuildDate>""" + + _format_date(datetime.now()) + + """</lastBuildDate>""" + ) + + for title, description, link, date, enclosures in twitter(channel): + print(" <item>") + print(" <title><![CDATA[" + title + "]]></title>") + print(" <link>" + link + "</link>") + print(" <description><![CDATA[" + description + "]]></description>") + print(" <pubDate>" + _format_date(date) + "</pubDate>") + for enclosure in enclosures: + print(' <media:content url="' + enclosure + '" />') + print(" </item>") + + print(" </channel>") + print("</rss>") + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage:", sys.argv[0], "<twitter channel>") + sys.exit(1) + main(sys.argv[1]) + # twitter('rheinbahn_intim') + # twitter('realDonaldTrump') |