Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
Project/Twitter_data_collection.py
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
126 lines (103 sloc)
4.62 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import tweepy | |
from tweepy import OAuthHandler | |
import json | |
import datetime as dt | |
import time | |
import os | |
import twitter_credentials | |
def load_api(): | |
''' Function that loads the twitter API after authorizing the user. ''' | |
auth = OAuthHandler(twitter_credentials.API_KEY, twitter_credentials.API_SECRET) | |
auth.set_access_token(twitter_credentials.ACCESS_TOKEN, twitter_credentials.ACCESS_TOKEN_SECRET) | |
# load the twitter API via tweepy | |
return tweepy.API(auth) | |
def tweet_search(api, query, max_tweets, max_id, since_id): | |
'''The function uses string 'query', max amount of tweets and the 'max_id' | |
as the starting point. Which returns a list of status objects | |
which provide data from the tweets''' | |
searched_tweets = [] | |
while len(searched_tweets) < max_tweets: | |
remaining_tweets = max_tweets - len(searched_tweets) | |
try: | |
gathered_tweets = api.search(q=query, count=remaining_tweets, | |
since_id=str(since_id), | |
max_id=str(max_id - 1), | |
tweet_mode='extended', | |
lang="en") | |
print('Found', len(gathered_tweets), 'tweets') | |
if not gathered_tweets: | |
print('No tweets found') | |
break | |
searched_tweets.extend(gathered_tweets) | |
max_id = gathered_tweets[-1].id | |
except tweepy.TweepError: | |
# Twitter data limiting can cause this so best to wait otherwise could get blocked | |
print('waiting 15 minutes') | |
time.sleep(15 * 60) | |
break | |
return searched_tweets, max_id | |
def get_tweet_id(api, date='', days_ago=6, query='a'): | |
''' Fuction gets the ID of a tweet. The ID is used as a starting | |
point to search for tweets. 'days_ago' variable allows to tweets | |
to be collected within a set day while the max being 9 ''' | |
if date: | |
# returns ID of the start date | |
td = date + dt.timedelta(days=1) | |
tweet_date = '{0}-{1:0>2}-{2:0>2}'.format(td.year, td.month, td.day) | |
tweet = api.search(q=query, count=1, until=tweet_date) | |
else: | |
# return ID from the specific previous day amount | |
td = dt.datetime.now() - dt.timedelta(days=days_ago) | |
tweet_date = '{0}-{1:0>2}-{2:0>2}'.format(td.year, td.month, td.day) | |
tweet = api.search(q=query, count=10, until=tweet_date, lang="en") | |
# return ID of the first tweet in the list | |
return tweet[0].id | |
def write_tweets(tweets, filename): | |
''' Function that appends to the file ''' | |
with open(filename, 'a') as f: | |
for tweet in tweets: | |
json.dump(tweet._json, f) | |
f.write('\n') | |
def main(): | |
''' This is a script that continuously searches for tweets | |
that were created over a given number of days. The search | |
dates and search phrase can be changed below. ''' | |
search_phrases = ['#covid19vaccine'] # Keyword that will be searched for | |
max_tweets = 100 # Number of tweets per search - maxi is 100 | |
min_days, max_days = 0, 6 # search day limits | |
# max can be 9 days ago | |
# min_days = 0 will search from current date | |
for search_phrase in search_phrases: | |
print('Keyword used is:', search_phrase) | |
read_IDs = False | |
json_file = 'Corona_Tweets' + '.json' | |
if os.path.isfile(json_file): | |
read_IDs = True | |
api = load_api() | |
# sets the starting ID for tweet collection | |
if read_IDs: | |
# open the json file and get the tweet ID | |
with open(json_file, 'r') as f: | |
lines = f.readlines() | |
max_id = json.loads(lines[-1])['id'] | |
print('Getting ID from bottom of file') | |
else: | |
# get the ID of a tweet that is min_days | |
if min_days == 0: | |
max_id = -1 | |
else: | |
max_id = get_tweet_id(api, days_ago=(min_days - 1)) | |
# set the smallest ID to search for | |
since_id = get_tweet_id(api, days_ago=(max_days - 1)) | |
''' Gathers tweets ''' | |
while dt.datetime.now(): | |
# collect tweets and update max_id | |
tweets, max_id = tweet_search(api, search_phrase + " -filter:retweets", max_tweets, | |
max_id=max_id, since_id=since_id) | |
# write tweets to file in JSON format | |
if tweets: | |
write_tweets(tweets, json_file) | |
else: | |
break | |
if __name__ == "__main__": | |
main() |