summaryrefslogtreecommitdiff
path: root/twitter.py
diff options
context:
space:
mode:
authorAndré Glüpker <git@wgmd.de>2021-05-05 20:09:30 +0200
committerAndré Glüpker <git@wgmd.de>2021-05-05 20:09:30 +0200
commit5774dbfb2caa42cb55bafab98a40e47f395e44d9 (patch)
tree8294b7b6fefebc1befeed4104f3b5604683999a8 /twitter.py
downloadrss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.tar.gz
rss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.tar.bz2
rss-feeds-5774dbfb2caa42cb55bafab98a40e47f395e44d9.zip
Initial commit of RSS converter application
Diffstat (limited to 'twitter.py')
-rwxr-xr-xtwitter.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/twitter.py b/twitter.py
new file mode 100755
index 0000000..5ddf8ad
--- /dev/null
+++ b/twitter.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env python3
+
+from urllib.error import HTTPError
+from urllib.request import urlopen, Request
+import logging
+
+# from requests_oauthlib import OAuth1Session
+from datetime import datetime
+import sys
+import json
+
+bearer = None
+
+
+def _format_date(dt):
+ """convert a datetime into an RFC 822 formatted date
+ Input date must be in GMT.
+ Stolen from PyRSS2Gen.
+ """
+ # Looks like:
+ # Sat, 07 Sep 2002 00:00:01 GMT
+ # Can't use strftime because that's locale dependent
+ #
+ # Isn't there a standard way to do this for Python? The
+ # rfc822 and email.Utils modules assume a timestamp. The
+ # following is based on the rfc822 module.
+ return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (
+ ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()],
+ dt.day,
+ [
+ "Jan",
+ "Feb",
+ "Mar",
+ "Apr",
+ "May",
+ "Jun",
+ "Jul",
+ "Aug",
+ "Sep",
+ "Oct",
+ "Nov",
+ "Dec",
+ ][dt.month - 1],
+ dt.year,
+ dt.hour,
+ dt.minute,
+ dt.second,
+ )
+
+
+def getBearer():
+ global bearer
+ if bearer:
+ return bearer
+ headers = {
+ "Authorization": "Basic Zzl1MXI2SFpYTXg0SXU5UGs5VlNvTzFUdzpmeTIyQjN2QVRRNUI2eGthb1BFdFFRUmtuUGQ1WGZBbnBKVG5hc0ZRa3NyUm5qaVNsaw==",
+ "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
+ }
+ data = b"grant_type=client_credentials"
+ url = "https://api.twitter.com/oauth2/token"
+
+ res = urlopen(Request(url, headers=headers, data=data, method="POST"))
+ response = json.loads(res.read().decode("UTF-8"))
+ bearer = response["access_token"]
+
+ return bearer
+
+
+def unshorten_urls(title, description, urls):
+ for url in urls:
+ shorted_url = url["url"]
+ long_url = url["expanded_url"]
+
+ if "images" in url:
+ img = url["images"][0]["url"]
+ long_url_html = '<a href="' + long_url + '"><img src="' + img + '"/></a>'
+ else:
+ long_url_html = '<a href="' + long_url + '">' + long_url + "</a>"
+
+ description = description.replace(shorted_url, long_url_html)
+ title = title.replace(shorted_url, long_url)
+ return title, description
+
+
+def twitter(user):
+ # 500.000 Tweets per month
+ # API KEY = g9u1r6HZXMx4Iu9Pk9VSoO1Tw
+ # API SECRET KEY = fy22B3vATQ5B6xkaoPEtQQRknPd5XfAnpJTnasFQksrRnjiSlk
+
+ headers = {"authorization": "Bearer " + getBearer()}
+
+ # Recent = last 7 days
+ url = (
+ "https://api.twitter.com/2/tweets/search/recent?query=from:"
+ + user
+ + "&tweet.fields=created_at,author_id,lang,source,public_metrics,entities&expansions=referenced_tweets.id,attachments.media_keys&media.fields=url"
+ )
+
+ try:
+ res = urlopen(Request(url, headers=headers))
+ response = json.loads(res.read().decode("UTF-8"))
+ except Exception as exc:
+ logging.error('Request to twitter failed.', exc_info=exc)
+ return None
+
+ if not response["meta"]["result_count"]:
+ return []
+
+ for tweet in response["data"]:
+ title = tweet["text"]
+ description = tweet["text"]
+ link = "https://twitter.com/" + user + "/status/" + str(tweet["id"])
+
+ # Check included tweets
+ if (
+ "referenced_tweets" in tweet
+ and len(tweet["referenced_tweets"]) == 1
+ and tweet["referenced_tweets"][0]["type"] == "retweeted"
+ ):
+ rt_info = title[: title.index(":") + 2]
+ ref_id = tweet["referenced_tweets"][0]["id"]
+ ref_tweet = next(
+ t for t in response["includes"]["tweets"] if t["id"] == ref_id
+ )
+ title = rt_info + ref_tweet["text"]
+ description = rt_info + ref_tweet["text"]
+ title, description = unshorten_urls(
+ title, description, ref_tweet.get("entities", {}).get("urls", [])
+ )
+
+ title, description = unshorten_urls(
+ title, description, tweet.get("entities", {}).get("urls", [])
+ )
+
+ # Attach media
+ enclosures = []
+ medias = tweet.get('attachments', {}).get('media_keys', [])
+ for media in medias:
+ ref_media = next(
+ t for t in response["includes"]["media"] if t["media_key"] == media
+ )
+ if 'url' not in ref_media: continue
+ if ref_media.get('type', '') == 'photo':
+ description += "<br/><img src=\"" + ref_media['url'] + "\" />"
+ else:
+ enclosures.append(ref_media['url'])
+
+ # Append Retweets etc
+ description += "<br/><br/>"
+ description += str(tweet["public_metrics"]["retweet_count"]) + " Retweets, "
+ description += str(tweet["public_metrics"]["like_count"]) + " Likes, "
+ description += str(tweet["public_metrics"]["reply_count"]) + " Replies, "
+ description += str(tweet["public_metrics"]["quote_count"]) + " Quotes"
+ description += "<br/>"
+ description += "Source: " + tweet["source"]
+
+ date = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ")
+
+
+ yield title, description, link, date, enclosures
+
+
+def main(channel):
+ print(
+ """<?xml version="1.0" encoding="UTF-8"?>
+<rss version="2.0">
+ <channel>
+ <title>Twitter: """
+ + channel
+ + """</title>
+ <link>https://twitter.com/"""
+ + channel
+ + """</link>
+ <description>The latest entries of the twitter account of """
+ + channel
+ + """</description>
+ <lastBuildDate>"""
+ + _format_date(datetime.now())
+ + """</lastBuildDate>"""
+ )
+
+ for title, description, link, date, enclosures in twitter(channel):
+ print(" <item>")
+ print(" <title><![CDATA[" + title + "]]></title>")
+ print(" <link>" + link + "</link>")
+ print(" <description><![CDATA[" + description + "]]></description>")
+ print(" <pubDate>" + _format_date(date) + "</pubDate>")
+ for enclosure in enclosures:
+ print(' <media:content url="' + enclosure + '" />')
+ print(" </item>")
+
+ print(" </channel>")
+ print("</rss>")
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print("Usage:", sys.argv[0], "<twitter channel>")
+ sys.exit(1)
+ main(sys.argv[1])
+ # twitter('rheinbahn_intim')
+ # twitter('realDonaldTrump')