From 5774dbfb2caa42cb55bafab98a40e47f395e44d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Gl=C3=BCpker?= Date: Wed, 5 May 2021 20:09:30 +0200 Subject: Initial commit of RSS converter application --- twitter.py | 202 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100755 twitter.py (limited to 'twitter.py') diff --git a/twitter.py b/twitter.py new file mode 100755 index 0000000..5ddf8ad --- /dev/null +++ b/twitter.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 + +from urllib.error import HTTPError +from urllib.request import urlopen, Request +import logging + +# from requests_oauthlib import OAuth1Session +from datetime import datetime +import sys +import json + +bearer = None + + +def _format_date(dt): + """convert a datetime into an RFC 822 formatted date + Input date must be in GMT. + Stolen from PyRSS2Gen. + """ + # Looks like: + # Sat, 07 Sep 2002 00:00:01 GMT + # Can't use strftime because that's locale dependent + # + # Isn't there a standard way to do this for Python? The + # rfc822 and email.Utils modules assume a timestamp. The + # following is based on the rfc822 module. + return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( + ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], + dt.day, + [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + ][dt.month - 1], + dt.year, + dt.hour, + dt.minute, + dt.second, + ) + + +def getBearer(): + global bearer + if bearer: + return bearer + headers = { + "Authorization": "Basic Zzl1MXI2SFpYTXg0SXU5UGs5VlNvTzFUdzpmeTIyQjN2QVRRNUI2eGthb1BFdFFRUmtuUGQ1WGZBbnBKVG5hc0ZRa3NyUm5qaVNsaw==", + "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", + } + data = b"grant_type=client_credentials" + url = "https://api.twitter.com/oauth2/token" + + res = urlopen(Request(url, headers=headers, data=data, method="POST")) + response = json.loads(res.read().decode("UTF-8")) + bearer = response["access_token"] + + return bearer + + +def unshorten_urls(title, description, urls): + for url in urls: + shorted_url = url["url"] + long_url = url["expanded_url"] + + if "images" in url: + img = url["images"][0]["url"] + long_url_html = '' + else: + long_url_html = '' + long_url + "" + + description = description.replace(shorted_url, long_url_html) + title = title.replace(shorted_url, long_url) + return title, description + + +def twitter(user): + # 500.000 Tweets per month + # API KEY = g9u1r6HZXMx4Iu9Pk9VSoO1Tw + # API SECRET KEY = fy22B3vATQ5B6xkaoPEtQQRknPd5XfAnpJTnasFQksrRnjiSlk + + headers = {"authorization": "Bearer " + getBearer()} + + # Recent = last 7 days + url = ( + "https://api.twitter.com/2/tweets/search/recent?query=from:" + + user + + "&tweet.fields=created_at,author_id,lang,source,public_metrics,entities&expansions=referenced_tweets.id,attachments.media_keys&media.fields=url" + ) + + try: + res = urlopen(Request(url, headers=headers)) + response = json.loads(res.read().decode("UTF-8")) + except Exception as exc: + logging.error('Request to twitter failed.', exc_info=exc) + return None + + if not response["meta"]["result_count"]: + return [] + + for tweet in response["data"]: + title = tweet["text"] + description = tweet["text"] + link = "https://twitter.com/" + user + "/status/" + str(tweet["id"]) + + # Check included tweets + if ( + "referenced_tweets" in tweet + and len(tweet["referenced_tweets"]) == 1 + and tweet["referenced_tweets"][0]["type"] == "retweeted" + ): + rt_info = title[: title.index(":") + 2] + ref_id = tweet["referenced_tweets"][0]["id"] + ref_tweet = next( + t for t in response["includes"]["tweets"] if t["id"] == ref_id + ) + title = rt_info + ref_tweet["text"] + description = rt_info + ref_tweet["text"] + title, description = unshorten_urls( + title, description, ref_tweet.get("entities", {}).get("urls", []) + ) + + title, description = unshorten_urls( + title, description, tweet.get("entities", {}).get("urls", []) + ) + + # Attach media + enclosures = [] + medias = tweet.get('attachments', {}).get('media_keys', []) + for media in medias: + ref_media = next( + t for t in response["includes"]["media"] if t["media_key"] == media + ) + if 'url' not in ref_media: continue + if ref_media.get('type', '') == 'photo': + description += "
" + else: + enclosures.append(ref_media['url']) + + # Append Retweets etc + description += "

" + description += str(tweet["public_metrics"]["retweet_count"]) + " Retweets, " + description += str(tweet["public_metrics"]["like_count"]) + " Likes, " + description += str(tweet["public_metrics"]["reply_count"]) + " Replies, " + description += str(tweet["public_metrics"]["quote_count"]) + " Quotes" + description += "
" + description += "Source: " + tweet["source"] + + date = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + + + yield title, description, link, date, enclosures + + +def main(channel): + print( + """ + + + Twitter: """ + + channel + + """ + https://twitter.com/""" + + channel + + """ + The latest entries of the twitter account of """ + + channel + + """ + """ + + _format_date(datetime.now()) + + """""" + ) + + for title, description, link, date, enclosures in twitter(channel): + print(" ") + print(" <![CDATA[" + title + "]]>") + print(" " + link + "") + print(" ") + print(" " + _format_date(date) + "") + for enclosure in enclosures: + print(' ') + print(" ") + + print(" ") + print("") + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print("Usage:", sys.argv[0], "") + sys.exit(1) + main(sys.argv[1]) + # twitter('rheinbahn_intim') + # twitter('realDonaldTrump') -- cgit v1.2.3