diff options
-rwxr-xr-x | netto.py | 49 | ||||
-rwxr-xr-x | rss.py | 56 | ||||
-rwxr-xr-x | telegram.py | 78 | ||||
-rwxr-xr-x | twitter.py | 72 | ||||
-rwxr-xr-x | webapp.py | 20 | ||||
-rwxr-xr-x | wsgi.py | 3 | ||||
-rwxr-xr-x | zdf.py | 43 |
7 files changed, 154 insertions, 167 deletions
@@ -5,50 +5,37 @@ from datetime import datetime from bs4 import BeautifulSoup import sys -def _format_date(dt): - """convert a datetime into an RFC 822 formatted date - Input date must be in GMT. - Stolen from PyRSS2Gen. - """ - # Looks like: - # Sat, 07 Sep 2002 00:00:01 GMT - # Can't use strftime because that's locale dependent - # - # Isn't there a standard way to do this for Python? The - # rfc822 and email.Utils modules assume a timestamp. The - # following is based on the rfc822 module. - return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( - ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], - dt.day, - ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month-1], - dt.year, dt.hour, dt.minute, dt.second) def netto(store_id): - url = 'https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/' + str(store_id) + url = "https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/" + str( + store_id + ) res = urlopen(Request(url)) soup = BeautifulSoup(res, features="html.parser") # messages = soup.find_all('div', attrs={'class': 'tgme_widget_message_wrap'}) - message = soup.find('a', attrs={'class': 'flipbook_pdf_flipbook'}) + message = soup.find("a", attrs={"class": "flipbook_pdf_flipbook"}) - url = message['href'].split('?')[0] + url = message["href"].split("?")[0] year = str(datetime.now().year) - title = url[ url.find(year) : url.find(year) + 7 ] + title = url[url.find(year) : url.find(year) + 7] return { - title: 'Netto Angebote für ' + store_id, - url: 'https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/' + store_id, - description: 'PDF der neuen Netto Angebote für den Laden um die Ecke.', - content: [{ - 'title': 'Angebote für ' + title, - 'url': url, - 'content': 'Angebote für ' + title + ' finden sich unter ' + url, - }] + title: "Netto Angebote für " + store_id, + url: "https://www.netto-online.de/ueber-netto/Online-Prospekte.chtm/" + + store_id, + description: "PDF der neuen Netto Angebote für den Laden um die Ecke.", + content: [ + { + "title": "Angebote für " + title, + "url": url, + "content": "Angebote für " + title + " finden sich unter " + url, + } + ], } -def main(store_id = 9110): +def main(store_id=9110): print(netto(store_id)) @@ -3,6 +3,7 @@ from datetime import datetime from typing import List + def _format_date(dt): """convert a datetime into an RFC 822 formatted date Input date must be in GMT. @@ -15,12 +16,39 @@ def _format_date(dt): # Isn't there a standard way to do this for Python? The # rfc822 and email.Utils modules assume a timestamp. The # following is based on the rfc822 module. + weekdays = [ + "Mon", + "Tue", + "Wed", + "Thu", + "Fri", + "Sat", + "Sun", + ] + months = [ + "Jan", + "Feb", + "Mar", + "Apr", + "May", + "Jun", + "Jul", + "Aug", + "Sep", + "Oct", + "Nov", + "Dec", + ] return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( - ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], - dt.day, - ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month-1], - dt.year, dt.hour, dt.minute, dt.second) + weekdays[dt.weekday()], + dt.day, + months[dt.month - 1], + dt.year, + dt.hour, + dt.minute, + dt.second, + ) + class RSSItem: title: str @@ -40,12 +68,12 @@ class RSSFeed: def buildRSS(feed_data: RSSFeed): """ - feed_data = { - title, url, description, - content = [{ - title, url, content, date, [enclosures], guid - }] - } + feed_data = { + title, url, description, + content = [{ + title, url, content, date, [enclosures], guid + }] + } """ feed = f"""<?xml version="1.0" encoding="UTF-8"?> @@ -60,14 +88,16 @@ def buildRSS(feed_data: RSSFeed): feed += " <item>" feed += f" <title><![CDATA[{item.get('title', 'N/A')}]]></title>" feed += f" <link>{item.get('url', 'N/A')}</link>" - feed += f" <description><![CDATA[{item.get('content', 'N/A')}]]></description>" + feed += ( + f" <description><![CDATA[{item.get('content', 'N/A')}]]></description>" + ) if "date" in item: if type(item["date"]) is str: feed += f" <pubDate>{item['date']}</pubDate>" else: feed += f" <pubDate>{_format_date(item['date'])}</pubDate>" for enclosure in item.get("enclosures", []): - feed += f" <media:content url=\"{enclosure}\" />" + feed += f' <media:content url="{enclosure}" />' if "guid" in item: feed += f" <guid>{item['guid']}</guid>" feed += " </item>" diff --git a/telegram.py b/telegram.py index 3058339..d95ce34 100755 --- a/telegram.py +++ b/telegram.py @@ -5,68 +5,64 @@ from datetime import datetime from bs4 import BeautifulSoup import sys -def _format_date(dt): - """convert a datetime into an RFC 822 formatted date - Input date must be in GMT. - Stolen from PyRSS2Gen. - """ - # Looks like: - # Sat, 07 Sep 2002 00:00:01 GMT - # Can't use strftime because that's locale dependent - # - # Isn't there a standard way to do this for Python? The - # rfc822 and email.Utils modules assume a timestamp. The - # following is based on the rfc822 module. - return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( - ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], - dt.day, - ["Jan", "Feb", "Mar", "Apr", "May", "Jun", - "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month-1], - dt.year, dt.hour, dt.minute, dt.second) +from rss import _format_date + def telegram(channel): - url = 'https://t.me/s/' + channel + url = "https://t.me/s/" + channel res = urlopen(Request(url)) soup = BeautifulSoup(res, features="html.parser") # messages = soup.find_all('div', attrs={'class': 'tgme_widget_message_wrap'}) - messages = soup.find_all('div', attrs={'class': 'tgme_widget_message_bubble'}) + messages = soup.find_all("div", attrs={"class": "tgme_widget_message_bubble"}) for message in messages: - date = message.find('time', attrs={'class': 'time'})['datetime'] - html = message.find('div', attrs={'class': 'tgme_widget_message_text'}) + date = message.find("time", attrs={"class": "time"})["datetime"] + html = message.find("div", attrs={"class": "tgme_widget_message_text"}) # preview = message.find('div', attrs={'class': 'tgme_widget_message_bubble'}) - link = message.find('a', attrs={'class': 'tgme_widget_message_date'}) - title = html.text if html else 'No text' - description = str(message) # if preview else '?' - link = link['href'] + link = message.find("a", attrs={"class": "tgme_widget_message_date"}) + title = html.text if html else "No text" + description = str(message) # if preview else '?' + link = link["href"] yield title, description, link, date + def main(channel): - url = 'https://t.me/s/' + channel + url = "https://t.me/s/" + channel - print("""<?xml version="1.0" encoding="UTF-8"?> + print( + """<?xml version="1.0" encoding="UTF-8"?> <rss version="2.0"> <channel> - <title>Telegram: """ + channel + """</title> - <link>""" + url + """</link> - <description>The latest entries of the telegram channel of """ +channel + """</description> - <lastBuildDate>""" + _format_date(datetime.now()) + """</lastBuildDate>""") + <title>Telegram: """ + + channel + + """</title> + <link>""" + + url + + """</link> + <description>The latest entries of the telegram channel of """ + + channel + + """</description> + <lastBuildDate>""" + + _format_date(datetime.now()) + + """</lastBuildDate>""" + ) for title, description, link, date in telegram(channel): - print(' <item>') - print(' <title><![CDATA[' + title + ']]></title>') - print(' <link>' + link + '</link>') - print(' <description><![CDATA[' + description + ']]></description>') - print(' <pubDate>' + date + '</pubDate>') + print(" <item>") + print(" <title><![CDATA[" + title + "]]></title>") + print(" <link>" + link + "</link>") + print(" <description><![CDATA[" + description + "]]></description>") + print(" <pubDate>" + date + "</pubDate>") # print(' <media:content url="' + thumbnail + b'" type="image/jpeg" />') - print(' </item>') + print(" </item>") + + print(" </channel>") + print("</rss>") - print(' </channel>') - print('</rss>') if __name__ == "__main__": if len(sys.argv) != 2: - print('Usage:', sys.argv[0], '<telegram channel>') + print("Usage:", sys.argv[0], "<telegram channel>") sys.exit(1) main(sys.argv[1]) @@ -12,42 +12,6 @@ import json bearer = None -def _format_date(dt): - """convert a datetime into an RFC 822 formatted date - Input date must be in GMT. - Stolen from PyRSS2Gen. - """ - # Looks like: - # Sat, 07 Sep 2002 00:00:01 GMT - # Can't use strftime because that's locale dependent - # - # Isn't there a standard way to do this for Python? The - # rfc822 and email.Utils modules assume a timestamp. The - # following is based on the rfc822 module. - return "%s, %02d %s %04d %02d:%02d:%02d GMT" % ( - ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()], - dt.day, - [ - "Jan", - "Feb", - "Mar", - "Apr", - "May", - "Jun", - "Jul", - "Aug", - "Sep", - "Oct", - "Nov", - "Dec", - ][dt.month - 1], - dt.year, - dt.hour, - dt.minute, - dt.second, - ) - - def getBearer(): global bearer if bearer: @@ -100,25 +64,28 @@ def twitter(user): res = urlopen(Request(url, headers=headers)) response = json.loads(res.read().decode("UTF-8")) except Exception as exc: - logging.error('Request to twitter failed.', exc_info=exc) + logging.error("Request to twitter failed.", exc_info=exc) return None feed = { - 'title': 'Twitter: ' + user, - 'url': 'https://twitter.com/' + user, - 'description': 'The latest entries of the twitter account of ' + user, - 'content': [] + "title": "Twitter: " + user, + "url": "https://twitter.com/" + user, + "description": "The latest entries of the twitter account of " + user, + "content": [], } if not response["meta"]["result_count"]: return feed - feed['content'] = [parse_tweet( + feed["content"] = [ + parse_tweet( user, tweet, response.get("includes", {}).get("tweets", []), response.get("includes", {}).get("media", []), - ) for tweet in response["data"]] + ) + for tweet in response["data"] + ] return feed @@ -132,9 +99,7 @@ def parse_tweet(user, tweet, included_tweets, included_media): if rt["type"] == "retweeted": rt_info = title[: title.index(":") + 2] - ref_tweet = next( - t for t in included_tweets if t["id"] == rt["id"] - ) + ref_tweet = next(t for t in included_tweets if t["id"] == rt["id"]) title = rt_info + ref_tweet["text"] description = rt_info + ref_tweet["text"] title, description = unshorten_urls( @@ -143,7 +108,7 @@ def parse_tweet(user, tweet, included_tweets, included_media): elif rt["type"] == "replied_to": description += "<br/>This was a reply to:<br/>" + rt["id"] elif rt["type"] == "quoted": - description += '<br/>Quoted tweet:<br/>' + rt["text"] + description += "<br/>Quoted tweet:<br/>" + rt["text"] else: description += f"<br/><br/>Unknown reference type: {rt['type']}" @@ -153,16 +118,17 @@ def parse_tweet(user, tweet, included_tweets, included_media): # Attach media enclosures = [] - included_media_keys = tweet.get('attachments', {}).get('media_keys', []) + included_media_keys = tweet.get("attachments", {}).get("media_keys", []) for included_media_key in included_media_keys: ref_media = next( t for t in included_media if t["media_key"] == included_media_key ) - if 'url' not in ref_media: continue - if ref_media.get('type', '') == 'photo': - description += "<br/><img src=\"" + ref_media['url'] + "\" />" + if "url" not in ref_media: + continue + if ref_media.get("type", "") == "photo": + description += '<br/><img src="' + ref_media["url"] + '" />' else: - enclosures.append(ref_media['url']) + enclosures.append(ref_media["url"]) # Append Retweets etc description += "<br/><br/>" @@ -183,9 +149,11 @@ def parse_tweet(user, tweet, included_tweets, included_media): "enclosures": enclosures, } + def main(channel): print(twitter(channel)) + if __name__ == "__main__": if len(sys.argv) != 2: print("Usage:", sys.argv[0], "<twitter channel>") @@ -12,11 +12,10 @@ import os import re import sys import time, datetime -# import traceback import logging from twitter import twitter -from telegram import telegram +# from telegram import telegram from netto import netto from rss import buildRSS from zdf import zdf @@ -27,25 +26,26 @@ app.secret_key = "NMcgoB.0wd+$.KVKj!F{3>U{%BBUVhL=7=5$:46rQH$Q{enCuU" def rssResponse(data): rss = buildRSS(data) - response = Response(rss, mimetype='text/xml') - response.headers['Access-Control-Allow-Origin'] = '*' + response = Response(rss, mimetype="text/xml") + response.headers["Access-Control-Allow-Origin"] = "*" return response @app.route("/") def main(): - return 'this is sparta' + return "this is sparta" @app.errorhandler(404) def not_found(e): - return 'Die angeforderte Seite konnte nicht gefunden werden.' + return "Die angeforderte Seite konnte nicht gefunden werden." @app.route("/twitter/<account>") def feedTwitter(account): return rssResponse(twitter(account)) + # @app.route("/telegram/<account>") # def feedTelegram(account): # content = [{'title': t, 'url': u, 'content': c, 'date': d} @@ -59,6 +59,7 @@ def feedTwitter(account): # response.headers['Access-Control-Allow-Origin'] = '*' # return response + @app.route("/netto/<market>") def feedNetto(market): return rssResponse(netto(market)) @@ -69,9 +70,8 @@ def filterZDFFeed(feed): return rssResponse(zdf(feed)) -if __name__ == '__main__': - logging.basicConfig(filename='./main.log', level=logging.INFO) +if __name__ == "__main__": + logging.basicConfig(filename="./main.log", level=logging.INFO) - app.config['TEMPLATES_AUTO_RELOAD'] = True + app.config["TEMPLATES_AUTO_RELOAD"] = True app.run(threaded=True) - @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import sys -sys.path.append('./') + +sys.path.append("./") from webapp import app as application @@ -4,46 +4,48 @@ from datetime import datetime from xml.dom.minidom import parse, parseString import locale + def getText(dom, element): textNode = dom.getElementsByTagName(element)[0].firstChild if textNode: return textNode.data return "" + def zdf(feed): url = f"https://www.zdf.de/rss/zdf/{feed}" try: res = urlopen(Request(url)) except Exception as exc: - logging.error('Request to zdf failed.', exc_info=exc) + logging.error("Request to zdf failed.", exc_info=exc) return None try: rss = res.read() xml = parseString(rss) except Exception as exc: - logging.error('Parsing to zdf failed.', exc_info=exc) + logging.error("Parsing to zdf failed.", exc_info=exc) return None try: - title = getText(xml, 'title') - description = getText(xml, 'description') + title = getText(xml, "title") + description = getText(xml, "description") content = [] - for show in xml.getElementsByTagName('item'): - s_url = getText(show, 'link') + for show in xml.getElementsByTagName("item"): + s_url = getText(show, "link") if not s_url: continue # Full episodes have the ID 100 - if not s_url.endswith('-100.html'): + if not s_url.endswith("-100.html"): continue - s_title = getText(show, 'title') + s_title = getText(show, "title") if not s_title.startswith(title): continue - s_date = getText(show, 'pubDate') + s_date = getText(show, "pubDate") s_date_parsed = datetime.strptime(s_date, "%a, %d %b %Y %H:%M:%S %z") if s_date_parsed.timestamp() > datetime.now().timestamp(): @@ -57,16 +59,18 @@ def zdf(feed): # tmp = datetime.strptime(s_tmp, "%d. %B %Y") # locale.setlocale(locale.LC_TIME, saved) - s_desc = getText(show, 'description') - s_guid = getText(show, 'guid') + s_desc = getText(show, "description") + s_guid = getText(show, "guid") print("Adding", s_url, s_desc) - content.append({ - 'title': s_title, - 'url': s_url, - 'content': s_desc, - 'date': s_date, - 'guid': s_guid, - }) + content.append( + { + "title": s_title, + "url": s_url, + "content": s_desc, + "date": s_date, + "guid": s_guid, + } + ) return { "title": title, @@ -75,7 +79,7 @@ def zdf(feed): "content": content, } except Exception as exc: - logging.error('Working with zdf failed.', exc_info=exc) + logging.error("Working with zdf failed.", exc_info=exc) return None @@ -84,6 +88,7 @@ def main(): # print(zdf("comedy/die-anstalt")) print(zdf("comedy/zdf-magazin-royale")) + if __name__ == "__main__": # if len(sys.argv) != 2: # print('Usage:', sys.argv[0], '<foobar>') |