From a056cdf87b88fcc086d5426880cef5a37661c6e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Gl=C3=BCpker?= Date: Fri, 30 Jul 2021 12:52:11 +0200 Subject: Fetch twitter replies / quoted tweets --- twitter.py | 34 ++++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) (limited to 'twitter.py') diff --git a/twitter.py b/twitter.py index 51dd51f..673fce7 100755 --- a/twitter.py +++ b/twitter.py @@ -83,6 +83,7 @@ def twitter(user): tweet, response.get("includes", {}).get("tweets", []), response.get("includes", {}).get("media", []), + headers, ) for tweet in response["data"] ] @@ -90,13 +91,13 @@ def twitter(user): return feed -def parse_tweet(user, tweet, included_tweets, included_media): +def parse_tweet(user, tweet, included_tweets, included_media, headers): title = description = tweet["text"] link = "https://twitter.com/" + user + "/status/" + str(tweet["id"]) # Check included re-tweets / replace by Retweet + ref_enclosures = [] for rt in tweet.get("referenced_tweets", []): - if rt["type"] == "retweeted": rt_info = title[: title.index(":") + 2] ref_tweet = next(t for t in included_tweets if t["id"] == rt["id"]) @@ -107,8 +108,14 @@ def parse_tweet(user, tweet, included_tweets, included_media): ) elif rt["type"] == "replied_to": description += "
This was a reply to: " + rt["id"] + text, enclosures = fetch_single_tweet(rt["id"], headers) + description += text + ref_enclosures.extend(enclosures) elif rt["type"] == "quoted": description += "
Quoted tweet: " + rt["id"] + text, enclosures = fetch_single_tweet(rt["id"], headers) + description += text + ref_enclosures.extend(enclosures) else: description += f"

Unknown reference type: {rt['type']}" @@ -129,6 +136,7 @@ def parse_tweet(user, tweet, included_tweets, included_media): description += '
' else: enclosures.append(ref_media["url"]) + enclosures.extend(ref_enclosures) # Append Retweets etc description += "

" @@ -150,6 +158,28 @@ def parse_tweet(user, tweet, included_tweets, included_media): } +def fetch_single_tweet(id, headers): + url = f"https://api.twitter.com/2/tweets/{id}?tweet.fields=entities&expansions=attachments.media_keys&media.fields=url" + try: + res = urlopen(Request(url, headers=headers)) + response = json.loads(res.read().decode("UTF-8")) + except Exception as exc: + logging.error("Request to twitter failed (single tweet).", exc_info=exc) + return None + + text = response['data'].get('text', 'no text') + + enclosures = [] + for media in response['data'].get('includes', {}).get('media', []): + if "url" not in media: + continue + if media.get("type", "") == "photo": + text += '
' + else: + enclosures.append(media["url"]) + + return text, enclosures + def main(channel): print(twitter(channel)) -- cgit v1.2.3