Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
300COM/main.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
210 lines (171 sloc)
6.11 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import the modules we need | |
import re | |
import time | |
import random | |
import argparse | |
import requests_html | |
import networkx as nx | |
from selenium import webdriver | |
from pyvis.network import Network | |
from urllib.parse import urlparse, parse_qs | |
from selenium.common.exceptions import WebDriverException | |
import warnings | |
# Warning filter | |
warnings.filterwarnings("ignore") | |
visited = set() | |
# Initialize Web_ driver | |
web_driver = None | |
# Get page information | |
def request_html(link): | |
global web_driver | |
time.sleep(random.randint(1, 10)) | |
web_driver.get(link) | |
while True: | |
try: | |
selector = "#gs_captcha_ccl,#recaptcha" | |
web_driver.find_element_by_css_selector(selector) | |
except WebDriverException: | |
try: | |
doc = web_driver.find_element_by_css_selector("#gs_top").get_attribute( | |
"innerHTML" | |
) | |
return requests_html.HTML(html=doc) | |
except WebDriverException: | |
web_driver.close() | |
web_driver = webdriver.Chrome() | |
return request_html(link) | |
time.sleep(3) | |
# Get the title of all articles on the whole page | |
def parse_all_citations(link, layers=1, pages=1): | |
if link in visited: | |
return | |
document = request_html(link) | |
visited.add(link) | |
# Get title | |
element_a = document.find("#gs_res_ccl_top a", first=True) | |
if element_a: | |
citation_to = { | |
"id": parse_cluster_id(link), | |
"title": element_a.text, | |
} | |
results = document.find("#gs_ab_md .gs_ab_mdw", first=True) | |
if results: | |
match = re.search("([0-9,]+) results", results.text) | |
if match: | |
citation_to["cited_by"] = int(match.group(1).replace(",", "")) | |
else: | |
citation_to = None | |
for article in document.find("#gs_res_ccl_mid .gs_r"): | |
citation_from = parse_publication_metadata(article) | |
if citation_from: | |
yield citation_from, citation_to | |
else: | |
continue | |
if layers > 0 and citation_from["cited_by_url"]: | |
yield from parse_all_citations( | |
citation_from["cited_by_url"], layers=layers - 1, pages=pages | |
) | |
if pages > 1: | |
for link in document.find("#gs_n a"): | |
if link.text == "Next": | |
yield from parse_all_citations( | |
"https://scholar.google.com" + link.attrs["href"], | |
layers=layers, | |
pages=pages - 1, | |
) | |
# Get specific information | |
def parse_cluster_id(link): | |
results = parse_qs(urlparse(link).query).get("cluster", []) | |
if len(results) == 1: | |
return results[0] | |
else: | |
results = parse_qs(urlparse(link).query).get("cites", []) | |
if len(results) == 1: | |
return results[0] | |
return None | |
# Store as dictionary | |
def delete_none_value(original_dict): | |
result_dict = {} | |
for key in original_dict.keys(): | |
if original_dict[key] is not None: | |
result_dict[key] = original_dict[key] | |
return result_dict | |
# Pull the required information and print it out | |
def parse_publication_metadata(element): | |
pub_id = parse_pub_id(element) | |
if not pub_id: | |
return None | |
element_a = element.find(".gs_rt a", first=True) | |
if element_a: | |
target_url = element_a.attrs["href"] | |
article_title = element_a.text | |
else: | |
target_url = None | |
article_title = element.find(".gs_rt .gs_ctu", first=True).text | |
article_authors = None | |
article_source = None | |
article_website = None | |
cited_by_count = None | |
cited_by_link = None | |
meta_text = element.find(".gs_a", first=True).text | |
meta_split = [match.strip() for match in re.split(r"\W-\W", meta_text)] | |
if len(meta_split) == 3: | |
article_authors, article_source, article_website = meta_split | |
elif len(meta_split) == 2: | |
article_authors, article_source = meta_split | |
if article_source and "," in article_source: | |
pub_year = article_source.split(",")[-1].strip() | |
else: | |
pub_year = article_source | |
for element_a in element.find(".gs_fl a"): | |
if "Cited by" in element_a.text: | |
cited_by_count = element_a.search("Cited by {:d}")[0] | |
cited_by_link = "https://scholar.google.com" + element_a.attrs["href"] | |
return { | |
"id": pub_id, | |
"url": target_url, | |
"title": article_title, | |
"authors": article_authors, | |
"year": pub_year, | |
"cited_by": cited_by_count, | |
"cited_by_url": cited_by_link, | |
} | |
# judge | |
def parse_pub_id(element): | |
selector = ".gs_fl a" | |
for element_a in element.find(selector): | |
if "Cited by" in element_a.text: | |
return parse_cluster_id(element_a.attrs["href"]) | |
elif "versions" in element_a.text: | |
return parse_cluster_id(element_a.attrs["href"]) | |
return element.attrs.get("data-cid") | |
# Analyze URL | |
def parse(): | |
argument_parser = argparse.ArgumentParser() | |
argument_parser.add_argument("url") | |
argument_parser.add_argument("--layers", type=int, default=1) | |
argument_parser.add_argument("--pages", type=int, default=1) | |
results = argument_parser.parse_args() | |
return results | |
# function | |
def run(): | |
global web_driver | |
parsed_args = parse() | |
web_driver = webdriver.Chrome() | |
diGraph = nx.DiGraph() | |
for citation_from, citation_to in parse_all_citations( | |
parsed_args.url, layers=parsed_args.layers, pages=parsed_args.pages | |
): | |
diGraph.add_node(citation_from["id"], label=citation_from["title"], **delete_none_value(citation_from)) | |
if citation_to is not None: | |
print("%s -> %s" % (citation_from["title"], citation_to["title"])) | |
diGraph.add_node(citation_to["id"], label=citation_to["title"], **delete_none_value(citation_to)) | |
diGraph.add_edge(citation_from["id"], citation_to["id"]) | |
# After execution, all data will be visualized and saved using physics | |
network = Network() | |
network.from_nx(diGraph) | |
network.show_buttons(filter_=["physics"]) | |
network.show("visualization.html") | |
web_driver.close() | |
if __name__ == "__main__": | |
run() |