diff options
-rwxr-xr-x | netto.py | 37 | ||||
-rwxr-xr-x | rss.py | 52 | ||||
-rwxr-xr-x | twitter.py | 135 | ||||
-rwxr-xr-x | webapp.py | 51 | ||||
-rwxr-xr-x | zdf.py | 7 |
5 files changed, 122 insertions, 160 deletions
@@ -36,34 +36,21 @@ def netto(store_id): year = str(datetime.now().year) title = url[ url.find(year) : url.find(year) + 7 ] - return title, url + return { + title: 'Netto Angebote für ' + store_id, + url: 'https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/' + store_id, + description: 'PDF der neuen Netto Angebote für den Laden um die Ecke.', + content: [{ + 'title': 'Angebote für ' + title, + 'url': url, + 'content': 'Angebote für ' + title + ' finden sich unter ' + url, + }] + } -def main(store_id = 9110): - url = 'https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/' + str(store_id) - print("""<?xml version="1.0" encoding="UTF-8"?> -<rss version="2.0"> - <channel> - <title>Netto Angebote """ + str(store_id) + """</title> - <link>""" + url + """</link> - <description>PDF der neuen Netto Angebote für den Laden um die Ecke.</description> - <lastBuildDate>""" + _format_date(datetime.now()) + """</lastBuildDate>""") - - title, link = netto(url) - print(' <item>') - print(' <title><![CDATA[Angebote für ' + title + ']]></title>') - print(' <link>' + link + '</link>') - # print(' <description><![CDATA[' + description + ']]></description>') - # print(' <pubDate>' + date + '</pubDate>') - # print(' <media:content url="' + thumbnail + b'" type="image/jpeg" />') - print(' </item>') +def main(store_id = 9110): + print(netto(store_id)) - print(' </channel>') - print('</rss>') if __name__ == "__main__": - # if len(sys.argv) != 2: - # print('Usage:', sys.argv[0], '<foobar>') - # sys.exit(1) - # main(sys.argv[1]) main() @@ -21,36 +21,40 @@ def _format_date(dt): "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month-1], dt.year, dt.hour, dt.minute, dt.second) -def buildRSS(title, url, description, content): +def buildRSS(feed_data): """ - Feed basic info: title, url, descriptions - Content: List[Dict{title, url, content, date, enclosures, guid}] + feed_data = { + title, url, description, + content = [{ + title, url, content, date, [enclosures], guid + }] + } """ - feed = """<?xml version="1.0" encoding="UTF-8"?> + feed = f"""<?xml version="1.0" encoding="UTF-8"?> <rss version="2.0" xmlns:media="http://search.yahoo.com/mrss/"> <channel> - <title>""" + title + """</title> - <link>""" + url + """</link> - <description>""" + description + """</description> - <lastBuildDate>""" + _format_date(datetime.now()) + """</lastBuildDate>""" + <title>{feed_data['title']}</title> + <link>{feed_data['url']}</link> + <description>{feed_data['description']}</description> + <lastBuildDate>{_format_date(datetime.now())}</lastBuildDate>""" - for item in content: - feed += ' <item>' - feed += ' <title><![CDATA[' + item.get('title', 'N/A') + ']]></title>' - feed += ' <link>' + item.get('url', 'N/A') + '</link>' - feed += ' <description><![CDATA[' + item.get('content', 'N/A') + ']]></description>' - if 'date' in item: - if type(item['date']) is str: - feed += ' <pubDate>' + item['date'] + '</pubDate>' + for item in feed_data["content"]: + feed += " <item>" + feed += f" <title><![CDATA[{item.get('title', 'N/A')}]]></title>" + feed += f" <link>{item.get('url', 'N/A')}</link>" + feed += f" <description><![CDATA[{item.get('content', 'N/A')}]]></description>" + if "date" in item: + if type(item["date"]) is str: + feed += f" <pubDate>{item['date']}</pubDate>" else: - feed += ' <pubDate>' + _format_date(item['date']) + '</pubDate>' - for enclosure in item.get('enclosures', []): - feed += ' <media:content url="' + enclosure + '" />' - if 'guid' in item: - feed += ' <guid>' + item['guid'] + '</guid>' - feed += ' </item>' + feed += f" <pubDate>{_format_date(item['date'])}</pubDate>" + for enclosure in item.get("enclosures", []): + feed += f" <media:content url=\"{enclosure}\" />" + if "guid" in item: + feed += f" <guid>{item['guid']}</guid>" + feed += " </item>" - feed += ' </channel>' - feed += '</rss>' + feed += " </channel>" + feed += "</rss>" return feed @@ -106,92 +106,79 @@ def twitter(user): if not response["meta"]["result_count"]: return [] - for tweet in response["data"]: - title = tweet["text"] - description = tweet["text"] - link = "https://twitter.com/" + user + "/status/" + str(tweet["id"]) - - # Check included tweets - if ( - "referenced_tweets" in tweet - and len(tweet["referenced_tweets"]) == 1 - and tweet["referenced_tweets"][0]["type"] == "retweeted" - ): + tweets = [parse_tweet( + user, + tweet, + response["includes"]["media"], + ) for tweet in response["data"]] + + return { + 'title': 'Twitter: ' + user, + 'url': 'https://twitter.com/' + user, + 'description': 'The latest entries of the twitter account of ' + user, + 'content': tweets + } + +def parse_tweet(user, tweet, media): + title = description = tweet["text"] + link = "https://twitter.com/" + user + "/status/" + str(tweet["id"]) + + # Check included re-tweets / replace by Retweet + for rt in tweet.get("referenced_tweets", []): + + if rt["type"] == "retweeted": rt_info = title[: title.index(":") + 2] - ref_id = tweet["referenced_tweets"][0]["id"] ref_tweet = next( - t for t in response["includes"]["tweets"] if t["id"] == ref_id + t for t in response["includes"]["tweets"] if t["id"] == rt["id"] ) title = rt_info + ref_tweet["text"] description = rt_info + ref_tweet["text"] title, description = unshorten_urls( title, description, ref_tweet.get("entities", {}).get("urls", []) ) + elif rt["type"] == "replied_to": + description += f"<br/><br/>This was a reply to {rt['id']}" + else: + description += f"<br/><br/>Unknown reference type: {rt['type']}" - title, description = unshorten_urls( - title, description, tweet.get("entities", {}).get("urls", []) - ) - - # Attach media - enclosures = [] - medias = tweet.get('attachments', {}).get('media_keys', []) - for media in medias: - ref_media = next( - t for t in response["includes"]["media"] if t["media_key"] == media - ) - if 'url' not in ref_media: continue - if ref_media.get('type', '') == 'photo': - description += "<br/><img src=\"" + ref_media['url'] + "\" />" - else: - enclosures.append(ref_media['url']) - - # Append Retweets etc - description += "<br/><br/>" - description += str(tweet["public_metrics"]["retweet_count"]) + " Retweets, " - description += str(tweet["public_metrics"]["like_count"]) + " Likes, " - description += str(tweet["public_metrics"]["reply_count"]) + " Replies, " - description += str(tweet["public_metrics"]["quote_count"]) + " Quotes" - description += "<br/>" - description += "Source: " + tweet["source"] - - date = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") - - - yield title, description, link, date, enclosures - - -def main(channel): - print( - """<?xml version="1.0" encoding="UTF-8"?> -<rss version="2.0"> - <channel> - <title>Twitter: """ - + channel - + """</title> - <link>https://twitter.com/""" - + channel - + """</link> - <description>The latest entries of the twitter account of """ - + channel - + """</description> - <lastBuildDate>""" - + _format_date(datetime.now()) - + """</lastBuildDate>""" + title, description = unshorten_urls( + title, description, tweet.get("entities", {}).get("urls", []) ) - for title, description, link, date, enclosures in twitter(channel): - print(" <item>") - print(" <title><![CDATA[" + title + "]]></title>") - print(" <link>" + link + "</link>") - print(" <description><![CDATA[" + description + "]]></description>") - print(" <pubDate>" + _format_date(date) + "</pubDate>") - for enclosure in enclosures: - print(' <media:content url="' + enclosure + '" />') - print(" </item>") - - print(" </channel>") - print("</rss>") + # Attach media + enclosures = [] + included_media_keys = tweet.get('attachments', {}).get('media_keys', []) + for included_media_key in included_media_keys: + ref_media = next( + t for t in media if t["media_key"] == included_media_key + ) + if 'url' not in ref_media: continue + if ref_media.get('type', '') == 'photo': + description += "<br/><img src=\"" + ref_media['url'] + "\" />" + else: + enclosures.append(ref_media['url']) + + # Append Retweets etc + description += "<br/><br/>" + description += str(tweet["public_metrics"]["retweet_count"]) + " Retweets, " + description += str(tweet["public_metrics"]["like_count"]) + " Likes, " + description += str(tweet["public_metrics"]["reply_count"]) + " Replies, " + description += str(tweet["public_metrics"]["quote_count"]) + " Quotes" + description += "<br/>" + description += "Source: " + tweet["source"] + + date = datetime.strptime(tweet["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ") + + return { + "title": title, + "url": link, + "content": description, + "date": date, + "enclosures": enclosures, + } +def main(channel): + print(twitter(channel)) if __name__ == "__main__": if len(sys.argv) != 2: @@ -35,55 +35,34 @@ def not_found(e): @app.route("/twitter/<account>") def feedTwitter(account): - content = [{'title': t, 'url': u, 'content': c, 'date': d, 'enclosures': e} - for t,c,u,d,e in twitter(account)] - xml = buildRSS( - title = 'Twitter: ' + account, - url = 'https://twitter.com/' + account, - description = 'The latest entries of the twitter account of ' + account, - content = content) + xml = buildRSS(twitter(account)) response = Response(xml, mimetype='text/xml') response.headers['Access-Control-Allow-Origin'] = '*' return response -@app.route("/telegram/<account>") -def feedTelegram(account): - content = [{'title': t, 'url': u, 'content': c, 'date': d} - for t,c,u,d in telegram(account)] - xml = buildRSS( - title = 'Telegram: ' + account, - url = 'https://t.me/s/' + account, - description = 'The latest entries of the telegram channel of ' + account, - content = content) - response = Response(xml, mimetype='text/xml') - response.headers['Access-Control-Allow-Origin'] = '*' - return response +# @app.route("/telegram/<account>") +# def feedTelegram(account): +# content = [{'title': t, 'url': u, 'content': c, 'date': d} +# for t,c,u,d in telegram(account)] +# xml = buildRSS( +# title = 'Telegram: ' + account, +# url = 'https://t.me/s/' + account, +# description = 'The latest entries of the telegram channel of ' + account, +# content = content) +# response = Response(xml, mimetype='text/xml') +# response.headers['Access-Control-Allow-Origin'] = '*' +# return response @app.route("/netto/<market>") def feedNetto(market): - title, url = netto(market) - content = [{ - 'title': 'Angebote für ' + title, - 'url': url, - 'content': 'Angebote für ' + title + ' finden sich unter ' + url, - }] - xml = buildRSS( - title = 'Netto Angebote für ' + market, - url = 'https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/' + market, - description = 'PDF der neuen Netto Angebote für den Laden um die Ecke.', - content = content) + xml = buildRSS(netto(market)) response = Response(xml, mimetype='text/xml') response.headers['Access-Control-Allow-Origin'] = '*' return response @app.route("/zdf/<path:feed>") def filterZDFFeed(feed): - title, url, description, content = zdf(feed) - xml = buildRSS( - title = title, - url = url, - description = description, - content = content) + xml = buildRSS(zdf(feed)) response = Response(xml, mimetype='text/xml') response.headers['Access-Control-Allow-Origin'] = '*' return response @@ -68,7 +68,12 @@ def zdf(feed): 'guid': s_guid, }) - return title, url, description, content + return { + "title": title, + "url": url, + "description": description, + "content": content, + } except Exception as exc: logging.error('Working with zdf failed.', exc_info=exc) return None |