1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
#!/usr/bin/env python3
import datetime
import difflib
import sqlite3
import time
import requests
import os
from bs4 import BeautifulSoup
from rss_types import RSSItem, RSSFeed
def extract_text(content) -> str:
soup = BeautifulSoup(content, features="html.parser")
for script in soup(["script", "style"]):
script.extract()
return soup.get_text(separator="\n", strip=True)
def compare_to_html(
text_old: str,
text_new: str,
) -> str:
if not text_old or not text_new:
return "N/A"
output = ["<pre>"]
for line in difflib.Differ().compare(text_old.splitlines(), text_new.splitlines()):
if line.startswith("+"):
output.append(f"🟢 {line.strip()[2:]}")
elif line.startswith("-"):
output.append(f"🔴 {line.strip()[2:]}")
elif line.startswith("?"):
output.append(f"🔵 {line.strip()[2:]}")
else:
output.append(f"<small>🔵 {line.strip()}</small>")
output.append("</pre>")
return "\n".join(output)
def get_page_delta(url):
conn = sqlite3.connect(os.path.join(os.path.dirname(__file__), "database", "website_data.db"))
cursor = conn.cursor()
# Initialize database, if needed
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS websites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT,
date_added INTEGER,
last_fetched INTEGER
);
"""
)
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS deltas (
website_id INTEGER,
headers TEXT,
content TEXT,
fetch_date INTEGER
);
"""
)
conn.commit()
# Add debug info
cursor.execute("PRAGMA table_info(deltas)")
existing_cols = [row[1] for row in cursor.fetchall()]
if "extracted_old" not in existing_cols:
cursor.execute("ALTER TABLE deltas ADD COLUMN {} text".format("extracted_old"))
cursor.execute("ALTER TABLE deltas ADD COLUMN {} text".format("extracted_new"))
conn.commit()
# Check, if current website is known. Get latest state, if known.
cursor.execute("SELECT id, last_fetched FROM websites WHERE url = ?", (url,))
id = last_fetched = last_content = None
data = cursor.fetchone()
if data:
id, last_fetched = data
cursor.execute("SELECT content FROM deltas WHERE website_id = ? ORDER BY fetch_date desc LIMIT 1", (id,))
last_content = cursor.fetchone()
if last_content:
last_content = last_content[0]
else:
cursor.execute(
"INSERT INTO websites (url, date_added, last_fetched) VALUES (?, ?, ?)",
(url, int(time.time()), int(time.time())),
)
conn.commit()
id = cursor.lastrowid
if not last_fetched or int(time.time()) - last_fetched > 3600:
response = requests.get(url, timeout=20)
cursor.execute("UPDATE websites SET last_fetched = ? WHERE id = ?", (int(time.time()), id))
extracted_new = extract_text(response.content)
extracted_old = extract_text(last_content)
if extracted_new != extracted_old:
cursor.execute(
"INSERT INTO deltas (website_id, headers, content, fetch_date, extracted_old, extracted_new) VALUES (?, ?, ?, ?, ?, ?)",
(id, str(response.headers), response.content, int(time.time()), extracted_old, extracted_new),
)
conn.commit()
cursor.execute("SELECT extracted_old, extracted_new, fetch_date FROM deltas WHERE website_id = ?", (id,))
updates = []
for update in cursor.fetchall():
extracted_old, extracted_new, fetch_date = update
updates.append(
RSSItem(
title=f"Change on {url}",
url=url,
content=compare_to_html(extracted_old, extracted_new),
date=datetime.datetime.fromtimestamp(fetch_date, tz=datetime.UTC),
enclosures=[],
guid=update[2],
)
)
return RSSFeed(
title=f"Updates for {url}",
url=url,
description=f"Detected changes on page {url}",
content=updates,
)
|