finished of mysql/sqlite cross support

This commit is contained in:
Daniel Knüttel 2019-02-25 14:14:55 +01:00
parent 8679f25e82
commit a623cf8497
8 changed files with 138 additions and 63 deletions

View File

@ -1,8 +1,13 @@
config = { config = {
"use_sqlite": False "use_sqlite": True
, "mysql_server": "172.17.0.2" , "mysql_server": "172.17.0.2"
, "mysql_user": "wikipedia" , "mysql_user": "wikipedia"
, "mysql_password": "wikipediastuff" , "mysql_password": "wikipediastuff"
, "mysql_database": "wikipedia_link_db" , "mysql_database": "wikipedia_link_db"
} }
if(config["use_sqlite"]):
config["sql_method"] = "sqlite"
else:
config["sql_method"] = "mysql"

View File

@ -2,11 +2,12 @@ from collections import deque
from cfg import config from cfg import config
from db_util import get_page_id from db_util import get_page_id
import sql
def can_reach(title, connection): def can_reach(title, connection):
page = get_page_id(title, connection) page = get_page_id(title, connection)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT COUNT(destination) FROM links WHERE destination=%s", (page, )) cursor.execute(sql.statements["count_links_to"], (page, ))
count = cursor.fetchone()[0] count = cursor.fetchone()[0]
return count > 0 return count > 0
@ -20,12 +21,7 @@ def shortest_path(center, title, connection):
path = deque() path = deque()
while(current_page != center_page): while(current_page != center_page):
path.append(current_page) path.append(current_page)
cursor.execute('''SELECT links.source cursor.execute(sql.statements["dijkstra_backtrack_one"], (current_page,))
FROM links
LEFT JOIN dijkstra_helper ON links.destination=dijkstra_helper.page
WHERE links.destination=%s
ORDER BY dijkstra_helper.value ASC
LIMIT 1''', (current_page,))
current_page = cursor.fetchone()[0] current_page = cursor.fetchone()[0]
return list(reversed(path)) return list(reversed(path))

View File

@ -1,11 +1,9 @@
from cfg import config from cfg import config
import sql
def _get_page_id(title, connection): def _get_page_id(title, connection):
cursor = connection.cursor() cursor = connection.cursor()
if(config["use_sqlite"]): cursor.execute(sql.statements["get_page_id"], (title,))
cursor.execute("SELECT rowid FROM pages WHERE title=%s", (title,))
else:
cursor.execute("SELECT page_id FROM pages WHERE title=%s", (title,))
return cursor.fetchone() return cursor.fetchone()
def get_page_id(title, connection): def get_page_id(title, connection):
@ -15,14 +13,11 @@ def get_page_id(title, connection):
return result[0] return result[0]
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("INSERT INTO pages(title) VALUES(%s)", (title,)) cursor.execute(sql.statements["insert_page"], (title,))
return _get_page_id(title, connection)[0] return _get_page_id(title, connection)[0]
def get_page_title(page_id, connection): def get_page_title(page_id, connection):
cursor = connection.cursor() cursor = connection.cursor()
if(config["use_sqlite"]): cursor.execute(sql.statements["get_page_title"], (page_id,))
cursor.execute("SELECT title FROM pages WHERE rowid=%s", (page_id,))
else:
cursor.execute("SELECT title FROM pages WHERE page_id=%s", (page_id,))
return cursor.fetchone()[0] return cursor.fetchone()[0]

View File

@ -2,23 +2,13 @@ from collections import deque
from cfg import config from cfg import config
from db_util import get_page_id from db_util import get_page_id
import sql
def prepare_dijkstra(connection): def prepare_dijkstra(connection):
cursor = connection.cursor() cursor = connection.cursor()
if(config["use_sqlite"]): cursor.execute(sql.statements["dijkstra_insert_pages"])
cursor.execute('''INSERT OR IGNORE INTO dijkstra_helper(page)
SELECT rowid FROM pages
''')
else:
cursor.execute('''INSERT IGNORE INTO dijkstra_helper(page)
SELECT page_id FROM pages
''')
cursor.execute(sql.statements["dijkstra_set_infinity"])
if(config["use_sqlite"]):
cursor.execute("UPDATE dijkstra_helper SET value=1e1000")
else:
cursor.execute("UPDATE dijkstra_helper SET value=2147483647")
connection.commit() connection.commit()
def dijkstra_one(page, value, connection): def dijkstra_one(page, value, connection):
@ -26,21 +16,11 @@ def dijkstra_one(page, value, connection):
if(isinstance(page, tuple)): if(isinstance(page, tuple)):
# Idk why this happens. # Idk why this happens.
title = title[0] title = title[0]
cursor.execute('''SELECT page cursor.execute(sql.statements["dijkstra_get_to_update"], (page, value + 1))
FROM dijkstra_helper
LEFT JOIN links ON links.destination=dijkstra_helper.page
WHERE links.source=%s
AND dijkstra_helper.value>%s''', (page, value + 1))
# This is the list of nodes that have to be updated # This is the list of nodes that have to be updated
result = cursor.fetchall() result = cursor.fetchall()
cursor.execute('''UPDATE dijkstra_helper cursor.execute(sql.statements["dijkstra_update"], (value + 1, page, value + 1))
SET value=%s
WHERE page IN (
SELECT destination
FROM links
WHERE source=%s)
AND dijkstra_helper.value>%s''', (value + 1, page, value + 1))
connection.commit() connection.commit()
return result return result
@ -58,7 +38,7 @@ def recursive_dijkstra(titles, value, connection):
def dijkstra(title, connection): def dijkstra(title, connection):
page = get_page_id(title, connection) page = get_page_id(title, connection)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("UPDATE dijkstra_helper SET value=0 WHERE page=%s", (page,)) cursor.execute(sql.statements["dijkstra_set_root"], (page,))
todos = dijkstra_one(page, 1, connection) todos = dijkstra_one(page, 1, connection)
recursive_dijkstra(todos, 2, connection) recursive_dijkstra(todos, 2, connection)

View File

@ -2,6 +2,7 @@ from collections import deque, defaultdict
import logging import logging
from cfg import config from cfg import config
import sql
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -15,11 +16,11 @@ class DijkstraHelper(object):
@classmethod @classmethod
def from_db(cls, connection): def from_db(cls, connection):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT page_id FROM pages") cursor.execute(sql.statements["get_all_page_ids"])
nodes = [n[0] for n in cursor.fetchall()] nodes = [n[0] for n in cursor.fetchall()]
connections = defaultdict(list) connections = defaultdict(list)
cursor.execute("SELECT source, destination FROM links") cursor.execute(sql.statements["get_links"])
for source, destination in cursor: for source, destination in cursor:
connections[source].append(destination) connections[source].append(destination)
@ -47,8 +48,8 @@ class DijkstraHelper(object):
def write_back(self, connection): def write_back(self, connection):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("DELETE FROM dijkstra_helper") cursor.execute(sql.statements["delete_dijkstra"])
cursor.executemany("INSERT INTO dijkstra_helper(page, value) VALUES(%s, %s)", list(self._nodes.items())) cursor.executemany(sql.statements["insert_dijkstra_values"], list(self._nodes.items()))

View File

@ -9,6 +9,7 @@ import time
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from cfg import config from cfg import config
import sql
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,8 +18,9 @@ class NoMoreProxiesException(Exception):
def get_data_with_proxy(url, conn_object, visit_first=None): def get_data_with_proxy(url, conn_object, visit_first=None):
cursor = conn_object.cursor() cursor = conn_object.cursor()
update_cursor = conn_object.cursor()
# Assume that table name is proxies # Assume that table name is proxies
cursor.execute('''SELECT proxy, lasttime_could_not_be_used FROM proxies ORDER BY lasttime_could_not_be_used ASC''') cursor.execute(sql.statements["get_proxies"])
headers = {} headers = {}
for i, lasttime_could_not_be_used in cursor: for i, lasttime_could_not_be_used in cursor:
session = requests.Session() session = requests.Session()
@ -29,13 +31,11 @@ def get_data_with_proxy(url, conn_object, visit_first=None):
if(isinstance(e, KeyboardInterrupt)): if(isinstance(e, KeyboardInterrupt)):
raise e raise e
# If proxy is invalid/inactive, update lasttime could not be used and go next proxy # If proxy is invalid/inactive, update lasttime could not be used and go next proxy
cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %s WHERE proxy = %s ''', update_cursor.execute(sql.statements["update_proxies"], (time.time(), i))
(time.time(), i))
continue continue
# If text is empty, update lasttime could not be used and go next proxy # If text is empty, update lasttime could not be used and go next proxy
if not response.text or 399 < response.status_code < 600: if not response.text or 399 < response.status_code < 600:
cursor.execute('''UPDATE proxies SET lasttime_could_not_be_used = %s WHERE proxy = %s ''', update_cursor.execute(sql.statements["update_proxies"], (time.time(), i))
(time.time(), i))
continue continue
# Be nice to Wikipedia. # Be nice to Wikipedia.
time.sleep(0.1) time.sleep(0.1)
@ -63,7 +63,7 @@ def fetch_proxies(connection):
url = "http://{}:{}".format(ip_addr, port) url = "http://{}:{}".format(ip_addr, port)
if(not proxy_is_in_db(url, connection)): if(not proxy_is_in_db(url, connection)):
cursor.execute("INSERT INTO proxies VALUES(%s, 0)", (url,)) cursor.execute(sql.statements["insert_proxy"], (url,))
cnt += 1 cnt += 1
logging.info("added {} new proxies".format(cnt)) logging.info("added {} new proxies".format(cnt))
connection.commit() connection.commit()
@ -92,7 +92,7 @@ def _get_rows(soup):
def proxy_is_in_db(url, connection): def proxy_is_in_db(url, connection):
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT proxy FROM proxies WHERE proxy = %s", (url,)) cursor.execute(sql.statements["proxy_in_db"], (url,))
return bool(cursor.fetchall()) return bool(cursor.fetchall())

View File

@ -5,6 +5,7 @@ from cfg import config
from url import construct_url from url import construct_url
from proxy import get_data_with_proxy, NoMoreProxiesException from proxy import get_data_with_proxy, NoMoreProxiesException
from db_util import get_page_id, get_page_title from db_util import get_page_id, get_page_title
import sql
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -39,7 +40,7 @@ def _receive_links(page, connection):
if(ignore_title(destination_title)): if(ignore_title(destination_title)):
continue continue
destination = get_page_id(destination_title, connection) destination = get_page_id(destination_title, connection)
cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (page, destination)) cursor.execute(sql.statements["insert_link"], (page, destination))
yield destination yield destination
else: else:
@ -48,7 +49,7 @@ def _receive_links(page, connection):
if(ignore_title(destination_title)): if(ignore_title(destination_title)):
continue continue
destination = get_page_id(destination_title, connection) destination = get_page_id(destination_title, connection)
cursor.execute("INSERT INTO links(source, destination) VALUES(%s, %s)", (page, destination)) cursor.execute(sql.statements["insert_link"], (page, destination))
yield destination yield destination
connection.commit() connection.commit()
@ -61,7 +62,7 @@ def receive_link_graph(title, connection, depth):
do_receive_link_graph(page, connection, depth, fetch_missing=True) do_receive_link_graph(page, connection, depth, fetch_missing=True)
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT COUNT(page) FROM failed_to_fetch") cursor.execute(sql.statements["count_failed_to_fetch"])
if(cursor.fetchone()[0]): if(cursor.fetchone()[0]):
do_receive_link_graph(page, connection, depth, fetch_missing=True) do_receive_link_graph(page, connection, depth, fetch_missing=True)
@ -75,17 +76,15 @@ def do_receive_link_graph(page, connection, depth, fetch_missing=False):
# Fetch the missing links. # Fetch the missing links.
if(fetch_missing): if(fetch_missing):
delete_cursor = connection.cursor() delete_cursor = connection.cursor()
cursor.execute('''SELECT failed_to_fetch.depth, failed_to_fetch.page cursor.execute(sql.statements["get_failed_to_fetch"])
FROM failed_to_fetch
''')
for d, p in cursor: for d, p in cursor:
do_receive_link_graph(p, connection, d, fetch_missing=False) do_receive_link_graph(p, connection, d, fetch_missing=False)
delete_cursor.execute("DELETE FROM failed_to_fetch WHERE page=%s", (p,)) delete_cursor.execute(sql.statements["delete_failed_to_fetch"], (p,))
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("SELECT COUNT(source) FROM links WHERE source=%s", (page,)) cursor.execute(sql.statements["count_links_from"], (page,))
if(cursor.fetchone()[0] != 0): if(cursor.fetchone()[0] != 0):
# we fetched that title already # we fetched that title already
return return
@ -101,7 +100,7 @@ def do_receive_link_graph(page, connection, depth, fetch_missing=False):
# Retry later, so we have to store our list that is still to fetch. # Retry later, so we have to store our list that is still to fetch.
cursor = connection.cursor() cursor = connection.cursor()
cursor.execute("INSERT INTO failed_to_fetch(page, depth) VALUES(%s, %s)", (link, depth - 1)) cursor.execute(sql.statements["insert_failed_to_fetch"], (link, depth - 1))
connection.commit() connection.commit()

99
exam/ex01/sql.py Normal file
View File

@ -0,0 +1,99 @@
from cfg import config
sql_statements = {
"update_proxies": {"sqlite": '''UPDATE proxies SET lasttime_could_not_be_used = ? WHERE proxy = ? '''
, "mysql": '''UPDATE proxies SET lasttime_could_not_be_used = %s WHERE proxy = %s '''}
, "get_proxies": {"sqlite": '''SELECT proxy, lasttime_could_not_be_used FROM proxies ORDER BY lasttime_could_not_be_used ASC'''
, "mysql": '''SELECT proxy, lasttime_could_not_be_used FROM proxies ORDER BY lasttime_could_not_be_used ASC'''}
, "insert_proxy": {"sqlite": "INSERT INTO proxies VALUES(?, 0)"
, "mysql": "INSERT INTO proxies VALUES(%s, 0)"}
, "proxy_in_db": {"sqlite": "SELECT proxy FROM proxies WHERE proxy = ?"
, "mysql": "SELECT proxy FROM proxies WHERE proxy = %s"}
, "insert_link": {"sqlite": "INSERT INTO links(source, destination) VALUES(?, ?)"
, "mysql": "INSERT INTO links(source, destination) VALUES(%s, %s)"}
, "count_failed_to_fetch": {"sqlite": "SELECT COUNT(page) FROM failed_to_fetch"
, "mysql": "SELECT COUNT(page) FROM failed_to_fetch"}
, "get_failed_to_fetch": {"sqlite": '''SELECT failed_to_fetch.depth, failed_to_fetch.page
FROM failed_to_fetch
'''
, "mysql": '''SELECT failed_to_fetch.depth, failed_to_fetch.page
FROM failed_to_fetch
'''}
, "delete_failed_to_fetch": {"sqlite": "DELETE FROM failed_to_fetch WHERE page=?"
, "mysql": "DELETE FROM failed_to_fetch WHERE page=%s"}
, "count_links_from": {"sqlite": "SELECT COUNT(source) FROM links WHERE source=?"
, "mysql": "SELECT COUNT(source) FROM links WHERE source=%s"}
, "insert_failed_to_fetch": {"sqlite": "INSERT INTO failed_to_fetch(page, depth) VALUES(?, ?)"
, "mysql": "INSERT INTO failed_to_fetch(page, depth) VALUES(%s, %s)"}
, "count_links_to": {"sqlite": "SELECT COUNT(destination) FROM links WHERE destination=?"
, "mysql": "SELECT COUNT(destination) FROM links WHERE destination=%s"}
, "dijkstra_backtrack_one": {"sqlite": '''SELECT links.source
FROM links
LEFT JOIN dijkstra_helper ON links.destination=dijkstra_helper.page
WHERE links.destination=?
ORDER BY dijkstra_helper.value ASC
LIMIT 1'''
, "mysql": '''SELECT links.source
FROM links
LEFT JOIN dijkstra_helper ON links.destination=dijkstra_helper.page
WHERE links.destination=%s
ORDER BY dijkstra_helper.value ASC
LIMIT 1'''}
, "get_page_id": {"sqlite": "SELECT rowid FROM pages WHERE title=?"
, "mysql": "SELECT page_id FROM pages WHERE title=%s"}
, "insert_page": {"sqlite": "INSERT INTO pages(title) VALUES(?)"
, "mysql": "INSERT INTO pages(title) VALUES(%s)"}
, "get_page_title": {"sqlite": "SELECT title FROM pages WHERE rowid=?"
, "mysql": "SELECT title FROM pages WHERE page_id=%s"}
, "dijkstra_insert_pages": {"sqlite": '''INSERT OR IGNORE INTO dijkstra_helper(page)
SELECT rowid FROM pages
'''
, "mysql": '''INSERT IGNORE INTO dijkstra_helper(page)
SELECT page_id FROM pages
'''}
, "dijkstra_set_infinity": {"sqlite": "UPDATE dijkstra_helper SET value=1e1000"
, "mysql": "UPDATE dijkstra_helper SET value=2147483647"}
, "dijkstra_get_to_update": {"sqlite": '''SELECT page
FROM dijkstra_helper
LEFT JOIN links ON links.destination=dijkstra_helper.page
WHERE links.source=?
AND dijkstra_helper.value>?'''
, "mysql": '''SELECT page
FROM dijkstra_helper
LEFT JOIN links ON links.destination=dijkstra_helper.page
WHERE links.source=%s
AND dijkstra_helper.value>%s'''}
, "dijkstra_update": {"sqlite": '''UPDATE dijkstra_helper
SET value=?
WHERE page IN (
SELECT destination
FROM links
WHERE source=?)
AND dijkstra_helper.value>?'''
, "mysql": '''UPDATE dijkstra_helper
SET value=%s
WHERE page IN (
SELECT destination
FROM links
WHERE source=%s)
AND dijkstra_helper.value>%s'''}
, "dijkstra_set_root": {"sqlite": "UPDATE dijkstra_helper SET value=0 WHERE page=?"
, "mysql": "UPDATE dijkstra_helper SET value=0 WHERE page=%s"}
, "get_all_page_ids": {"sqlite": "SELECT rowid FROM pages"
, "mysql": "SELECT page_id FROM pages"}
, "get_links": {"sqlite": "SELECT source, destination FROM links"
, "mysql": "SELECT source, destination FROM links"}
, "delete_dijkstra": {"sqlite": "DELETE FROM dijkstra_helper"
, "mysql": "DELETE FROM dijkstra_helper"}
, "insert_dijkstra_values": {"sqlite": "INSERT INTO dijkstra_helper(page, value) VALUES(?, ?)"
, "mysql": "INSERT INTO dijkstra_helper(page, value) VALUES(%s, %s)"}
}
statements = {name: statement[config["sql_method"]] for name, statement in sql_statements.items()}