added mysql cache support

This commit is contained in:
Daniel Knüttel 2019-02-19 14:16:22 +01:00
parent a933d441ce
commit 95649ace91
5 changed files with 70 additions and 35 deletions

View File

@ -18,9 +18,10 @@ def get_cache(directory, name):
cursor = db.cursor() cursor = db.cursor()
cursor.execute("CREATE TABLE proxies(proxy TEXT, lasttime_could_not_be_used DECIMAL)") cursor.execute("CREATE TABLE proxies(proxy TEXT, lasttime_could_not_be_used DECIMAL)")
cursor.execute("CREATE TABLE links(source TEXT, destination TEXT)") cursor.execute("CREATE TABLE links(source INT, destination INT)")
cursor.execute("CREATE TABLE dijkstra_helper(name TEXT UNIQUE, value INT)") cursor.execute("CREATE TABLE dijkstra_helper(page INT UNIQUE, value INT)")
cursor.execute("CREATE TABLE failed_to_fetch(title TEXT, depth INT)") cursor.execute("CREATE TABLE failed_to_fetch(page INT, depth INT)")
cursor.execute("CREATE TABLE pages(title TEXT)")
db.commit() db.commit()
db = sqlite3.connect(cache_file) db = sqlite3.connect(cache_file)
@ -36,9 +37,15 @@ def get_cache(directory, name):
cursor = db.cursor() cursor = db.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS proxies(proxy varchar(100), lasttime_could_not_be_used DECIMAL)") cursor.execute("CREATE TABLE IF NOT EXISTS proxies(proxy varchar(100), lasttime_could_not_be_used DECIMAL)")
cursor.execute("CREATE TABLE IF NOT EXISTS links(source varchar(100) CHARACTER SET utf8mb4, destination varchar(100) CHARACTER SET utf8mb4)") cursor.execute("CREATE TABLE IF NOT EXISTS links(source INT, destination INT)")
cursor.execute("CREATE TABLE IF NOT EXISTS dijkstra_helper(name varchar(100) CHARACTER SET utf8mb4 UNIQUE, value INT)") cursor.execute("CREATE TABLE IF NOT EXISTS dijkstra_helper(page INT UNIQUE, value INT)")
cursor.execute("CREATE TABLE IF NOT EXISTS failed_to_fetch(title varchar(100) CHARACTER SET utf8mb4, depth INT)") cursor.execute("CREATE TABLE IF NOT EXISTS failed_to_fetch(page INT, depth INT)")
cursor.execute('''CREATE TABLE IF NOT EXISTS
pages(
title varchar(400) CHARACTER SET utf8mb4
, page_id INT AUTO_INCREMENT
, PRIMARY KEY(page_id)
)''')
db.commit() db.commit()
fetch_proxies(db) fetch_proxies(db)

View File

@ -1,10 +1,12 @@
from collections import deque from collections import deque
from cfg import config from cfg import config
from db_util import get_page_id
def can_reach(title, connection): def can_reach(title, connection):
page = get_page_id(title, connection)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT COUNT(destination) FROM links WHERE destination=%s", (title, )) cursor.execute("SELECT COUNT(destination) FROM links WHERE destination=%s", (page, ))
count = cursor.fetchone()[0] count = cursor.fetchone()[0]
return count > 0 return count > 0
@ -13,16 +15,17 @@ def shortest_path(center, title, connection):
return [] return []
cursor = connection.cursor() cursor = connection.cursor()
current_title = title current_page = get_page_id(title, connection)
center_page = get_page_id(center, connection)
path = deque() path = deque()
while(current_title != center): while(current_page != center_page):
path.append(current_title) path.append(current_title)
cursor.execute('''SELECT links.source cursor.execute('''SELECT links.source
FROM links FROM links
LEFT JOIN dijkstra_helper ON links.destination=dijkstra_helper.name LEFT JOIN dijkstra_helper ON links.destination=dijkstra_helper.page
WHERE links.destination=:title WHERE links.destination=:page
ORDER BY dijkstra_helper.value ASC ORDER BY dijkstra_helper.value ASC
LIMIT 1''', {"title": current_title}) LIMIT 1''', {"page": current_page})
current_title = cursor.fetchone()[0] current_title = cursor.fetchone()[0]
return list(reversed(path)) return list(reversed(path))

View File

@ -1,12 +1,16 @@
from collections import deque from collections import deque
from cfg import config from cfg import config
from db_util import get_page_id
def prepare_dijkstra(connection): def prepare_dijkstra(connection):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute('''INSERT OR IGNORE INTO dijkstra_helper(name) cursor.execute('''INSERT OR IGNORE INTO dijkstra_helper(page)
SELECT destination FROM links SELECT destination FROM links
''') ''')
cursor.execute('''INSERT OR IGNORE INTO dijkstra_helper(page)
SELECT source FROM links
''')
if(config["use_sqlite"]): if(config["use_sqlite"]):
cursor.execute("UPDATE dijkstra_helper SET value=1e1000") cursor.execute("UPDATE dijkstra_helper SET value=1e1000")
@ -14,16 +18,16 @@ def prepare_dijkstra(connection):
cursor.execute("UPDATE dijkstra_helper SET value=2147483647") cursor.execute("UPDATE dijkstra_helper SET value=2147483647")
connection.commit() connection.commit()
def dijkstra_one(title, value, connection): def dijkstra_one(page, value, connection):
cursor = connection.cursor() cursor = connection.cursor()
if(isinstance(title, tuple)): if(isinstance(title, tuple)):
# Idk why this happens. # Idk why this happens.
title = title[0] title = title[0]
cursor.execute('''SELECT name cursor.execute('''SELECT page
FROM dijkstra_helper FROM dijkstra_helper
LEFT JOIN links ON links.destination=dijkstra_helper.name LEFT JOIN links ON links.destination=dijkstra_helper.page
WHERE links.source=:title WHERE links.source=:page
AND dijkstra_helper.value>:value''', {"title": title, "value": value + 1}) AND dijkstra_helper.value>:value''', {"pate": page, "value": value + 1})
# This is the list of nodes that have to be updated # This is the list of nodes that have to be updated
result = cursor.fetchall() result = cursor.fetchall()
@ -32,8 +36,8 @@ def dijkstra_one(title, value, connection):
WHERE name IN ( WHERE name IN (
SELECT destination SELECT destination
FROM links FROM links
WHERE source=:title) WHERE source=:page)
AND dijkstra_helper.value>:value''', {"value": value + 1, "title": title}) AND dijkstra_helper.value>:value''', {"value": value + 1, "page": page})
connection.commit() connection.commit()
return result return result
@ -49,10 +53,11 @@ def recursive_dijkstra(titles, value, connection):
def dijkstra(title, connection): def dijkstra(title, connection):
page = get_page_id(title, connection)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("UPDATE dijkstra_helper SET value=0 WHERE name=%s", (title,)) cursor.execute("UPDATE dijkstra_helper SET value=0 WHERE name=%s", (page,))
todos = dijkstra_one(title, 1, connection) todos = dijkstra_one(page, 1, connection)
recursive_dijkstra(todos, 2, connection) recursive_dijkstra(todos, 2, connection)

View File

@ -25,16 +25,20 @@ def get_data_with_proxy(url, conn_object, visit_first=None):
session.proxies = { 'http': i} session.proxies = { 'http': i}
try: try:
response = session.get(url, headers=headers, timeout=3) response = session.get(url, headers=headers, timeout=3)
except: except Exception as e:
if(isinstance(e, KeyboardInterrupt)):
raise e
# If proxy is invalid/inactive, update lasttime could not be used and go next proxy # If proxy is invalid/inactive, update lasttime could not be used and go next proxy
cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %d WHERE proxy = %s ''', cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %s WHERE proxy = %s ''',
(time.time(), i)) (time.time(), i))
continue continue
# If text is empty, update lasttime could not be used and go next proxy # If text is empty, update lasttime could not be used and go next proxy
if not response.text or 399 < response.status_code < 600: if not response.text or 399 < response.status_code < 600:
cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %d WHERE proxy = %s ''', cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %s WHERE proxy = %s ''',
(time.time(), i)) (time.time(), i))
continue continue
# Be nice to Wikipedia.
time.sleep(0.3)
return response.json() return response.json()
raise NoMoreProxiesException("No more proxies left") raise NoMoreProxiesException("No more proxies left")

View File

@ -1,6 +1,7 @@
import logging import logging
from url import construct_url from url import construct_url
from proxy import get_data_with_proxy, NoMoreProxiesException from proxy import get_data_with_proxy, NoMoreProxiesException
from db_util import get_page_id
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,6 +20,8 @@ def ignore_title(title):
def _receive_links(title, connection): def _receive_links(title, connection):
url = construct_url(title) url = construct_url(title)
source = get_page_id(title, connection)
result = get_data_with_proxy(url, connection) result = get_data_with_proxy(url, connection)
# This is basically because we don't know the page ID. # This is basically because we don't know the page ID.
for k, page_data in result["query"]["pages"].items(): for k, page_data in result["query"]["pages"].items():
@ -30,7 +33,8 @@ def _receive_links(title, connection):
continue continue
if(ignore_title(title)): if(ignore_title(title)):
continue continue
cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (title, destination_title)) destination = get_page_id(destination_title, connection)
cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (source, destination))
yield destination_title yield destination_title
else: else:
@ -38,11 +42,8 @@ def _receive_links(title, connection):
if(ignore_title(title)): if(ignore_title(title)):
continue continue
destination_title = destination["title"].replace(" ", "_") destination_title = destination["title"].replace(" ", "_")
try: destination = get_page_id(destination_title, connection)
cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (title, destination_title)) cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (source, destination))
except Exception as e:
print(destination_title)
raise e
yield destination_title yield destination_title
connection.commit() connection.commit()
@ -50,14 +51,28 @@ def receive_links(title, connection):
return list(_receive_links(title, connection)) return list(_receive_links(title, connection))
def receive_link_graph(title, connection, depth): def receive_link_graph(title, connection, depth, fetch_missing=True):
cursor = connection.cursor()
# Fetch the missing links.
if(fetch_missing):
delete_cursor = connection.cursor()
cursor.execute('''SELECT pages.title, failed_to_fetch.depth, failed_to_fetch.page
FROM failed_to_fetch
LEFT JOIN pages ON pages.page_id=failed_to_fetch.page''')
for t, d, p in cursor:
receive_link_graph(t, connection, d, fetch_missing=False)
delete_cursor.execute("DELETE FROM failed_to_fetch WHERE page=%s", (p,))
if(depth < 0): if(depth < 0):
# end of recursion # end of recursion
return return
page = get_page_id(title, connection)
cursor = connection.cursor() cursor = connection.cursor()
print(repr(title)) cursor.execute("SELECT COUNT(source) FROM links WHERE source=%s", (page,))
cursor.execute("SELECT COUNT(source) FROM links WHERE source=%s", (title,))
if(cursor.fetchone()[0] != 0): if(cursor.fetchone()[0] != 0):
# we fetched that title already # we fetched that title already
return return
@ -73,7 +88,8 @@ def receive_link_graph(title, connection, depth):
# Retry later, so we have to store our list that is still to fetch. # Retry later, so we have to store our list that is still to fetch.
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("INSERT INTO failed_to_fetch(title, depth) VALUES(%s, %d)", (link, depth - 1)) failed_page = get_page_id(link, connection)
cursor.execute("INSERT INTO failed_to_fetch(page, depth) VALUES(%s, %s)", (failed_page, depth - 1))
connection.commit() connection.commit()