Scraping Instagram for Hashtag data
Over the past few months, I've been actively using python and I have made a few scripts to scrape #hashtag data from Instagram.
It all started with some basic script I had made early 2017 and I have been adding and modifying it ever since. Over the last few months, I made progress in my own skill of Python, successfully adding things like user agent and proxy rotation.
Now that I have a tool that does exactly what I want, I'm looking to:
- Optimize code structure (it's really copying and pasting mostly) and removing 'crappy' code.
Therefore I'm hoping SO can help me analyze my code and suggest optimizations.
My script does the following:
- It analyzes hashtags from the input file (hashtags.txt)
- It then scrapes data from Instagram (like post count, average engagement,...)
- This data is then stored in a .csv. Which is being processed again afterward to remove duplicates.
I also included user agent randomization and proxy rotation.
However, I feel like my code is far from optimal and when I want to add additional things (like catching HTTP errors, retrying on proxy timeouts,...) I'm just adding more levels of indentation so I'm pretty sure there are other options there!
Any help or feedback to optimize my code below is GREATLY appreciated!
# This script is written for personal research and is not endorsed by Instagram.
# Use at your own risk!
# -*- coding: utf-8 -*-
import csv
import requests
import urllib.request
import json
import re
import random
import time
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
def write_csv_header(filename, headers):
with open(filename, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=headers)
writer.writeheader()
return
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
#file
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
csv_headers = get_csv_header(9)
write_csv_header(DATA_FILE, csv_headers)
#Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
#get the data
for keyword in keywords:
import urllib, json
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
'https://www.instagram.com/explore/tags/' + urllib.parse.quote_plus(keyword) + '/?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
x = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
i = avg_post_likes = 0
likes_value =
comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = 'https://www.instagram.com/explore/tags/' + keyword
post_ready_tag = '#' + keyword
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
while i <=x-1:
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
i += 1
print('Writing ' + keyword + ' to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
csv_writer = csv.DictWriter(data_out, fieldnames=csv_headers)
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
#cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
# Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
# Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
# Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
# Be nice and close out the files
target.close()
data.close()
import os
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
For those interested, this is how the output file looks:
Thanks in advance! <3
python python-3.x web-scraping
New contributor
add a comment |
Over the past few months, I've been actively using python and I have made a few scripts to scrape #hashtag data from Instagram.
It all started with some basic script I had made early 2017 and I have been adding and modifying it ever since. Over the last few months, I made progress in my own skill of Python, successfully adding things like user agent and proxy rotation.
Now that I have a tool that does exactly what I want, I'm looking to:
- Optimize code structure (it's really copying and pasting mostly) and removing 'crappy' code.
Therefore I'm hoping SO can help me analyze my code and suggest optimizations.
My script does the following:
- It analyzes hashtags from the input file (hashtags.txt)
- It then scrapes data from Instagram (like post count, average engagement,...)
- This data is then stored in a .csv. Which is being processed again afterward to remove duplicates.
I also included user agent randomization and proxy rotation.
However, I feel like my code is far from optimal and when I want to add additional things (like catching HTTP errors, retrying on proxy timeouts,...) I'm just adding more levels of indentation so I'm pretty sure there are other options there!
Any help or feedback to optimize my code below is GREATLY appreciated!
# This script is written for personal research and is not endorsed by Instagram.
# Use at your own risk!
# -*- coding: utf-8 -*-
import csv
import requests
import urllib.request
import json
import re
import random
import time
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
def write_csv_header(filename, headers):
with open(filename, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=headers)
writer.writeheader()
return
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
#file
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
csv_headers = get_csv_header(9)
write_csv_header(DATA_FILE, csv_headers)
#Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
#get the data
for keyword in keywords:
import urllib, json
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
'https://www.instagram.com/explore/tags/' + urllib.parse.quote_plus(keyword) + '/?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
x = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
i = avg_post_likes = 0
likes_value =
comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = 'https://www.instagram.com/explore/tags/' + keyword
post_ready_tag = '#' + keyword
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
while i <=x-1:
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
i += 1
print('Writing ' + keyword + ' to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
csv_writer = csv.DictWriter(data_out, fieldnames=csv_headers)
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
#cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
# Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
# Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
# Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
# Be nice and close out the files
target.close()
data.close()
import os
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
For those interested, this is how the output file looks:
Thanks in advance! <3
python python-3.x web-scraping
New contributor
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday
add a comment |
Over the past few months, I've been actively using python and I have made a few scripts to scrape #hashtag data from Instagram.
It all started with some basic script I had made early 2017 and I have been adding and modifying it ever since. Over the last few months, I made progress in my own skill of Python, successfully adding things like user agent and proxy rotation.
Now that I have a tool that does exactly what I want, I'm looking to:
- Optimize code structure (it's really copying and pasting mostly) and removing 'crappy' code.
Therefore I'm hoping SO can help me analyze my code and suggest optimizations.
My script does the following:
- It analyzes hashtags from the input file (hashtags.txt)
- It then scrapes data from Instagram (like post count, average engagement,...)
- This data is then stored in a .csv. Which is being processed again afterward to remove duplicates.
I also included user agent randomization and proxy rotation.
However, I feel like my code is far from optimal and when I want to add additional things (like catching HTTP errors, retrying on proxy timeouts,...) I'm just adding more levels of indentation so I'm pretty sure there are other options there!
Any help or feedback to optimize my code below is GREATLY appreciated!
# This script is written for personal research and is not endorsed by Instagram.
# Use at your own risk!
# -*- coding: utf-8 -*-
import csv
import requests
import urllib.request
import json
import re
import random
import time
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
def write_csv_header(filename, headers):
with open(filename, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=headers)
writer.writeheader()
return
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
#file
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
csv_headers = get_csv_header(9)
write_csv_header(DATA_FILE, csv_headers)
#Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
#get the data
for keyword in keywords:
import urllib, json
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
'https://www.instagram.com/explore/tags/' + urllib.parse.quote_plus(keyword) + '/?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
x = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
i = avg_post_likes = 0
likes_value =
comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = 'https://www.instagram.com/explore/tags/' + keyword
post_ready_tag = '#' + keyword
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
while i <=x-1:
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
i += 1
print('Writing ' + keyword + ' to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
csv_writer = csv.DictWriter(data_out, fieldnames=csv_headers)
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
#cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
# Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
# Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
# Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
# Be nice and close out the files
target.close()
data.close()
import os
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
For those interested, this is how the output file looks:
Thanks in advance! <3
python python-3.x web-scraping
New contributor
Over the past few months, I've been actively using python and I have made a few scripts to scrape #hashtag data from Instagram.
It all started with some basic script I had made early 2017 and I have been adding and modifying it ever since. Over the last few months, I made progress in my own skill of Python, successfully adding things like user agent and proxy rotation.
Now that I have a tool that does exactly what I want, I'm looking to:
- Optimize code structure (it's really copying and pasting mostly) and removing 'crappy' code.
Therefore I'm hoping SO can help me analyze my code and suggest optimizations.
My script does the following:
- It analyzes hashtags from the input file (hashtags.txt)
- It then scrapes data from Instagram (like post count, average engagement,...)
- This data is then stored in a .csv. Which is being processed again afterward to remove duplicates.
I also included user agent randomization and proxy rotation.
However, I feel like my code is far from optimal and when I want to add additional things (like catching HTTP errors, retrying on proxy timeouts,...) I'm just adding more levels of indentation so I'm pretty sure there are other options there!
Any help or feedback to optimize my code below is GREATLY appreciated!
# This script is written for personal research and is not endorsed by Instagram.
# Use at your own risk!
# -*- coding: utf-8 -*-
import csv
import requests
import urllib.request
import json
import re
import random
import time
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
def write_csv_header(filename, headers):
with open(filename, 'w', newline='') as f_out:
writer = csv.DictWriter(f_out, fieldnames=headers)
writer.writeheader()
return
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
#file
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
csv_headers = get_csv_header(9)
write_csv_header(DATA_FILE, csv_headers)
#Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
#get the data
for keyword in keywords:
import urllib, json
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
'https://www.instagram.com/explore/tags/' + urllib.parse.quote_plus(keyword) + '/?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
x = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
i = avg_post_likes = 0
likes_value =
comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = 'https://www.instagram.com/explore/tags/' + keyword
post_ready_tag = '#' + keyword
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
while i <=x-1:
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
i += 1
print('Writing ' + keyword + ' to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
csv_writer = csv.DictWriter(data_out, fieldnames=csv_headers)
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
#cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
# Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
# Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
# Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
# Be nice and close out the files
target.close()
data.close()
import os
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
For those interested, this is how the output file looks:
Thanks in advance! <3
python python-3.x web-scraping
python python-3.x web-scraping
New contributor
New contributor
edited yesterday
Zeta
15.1k23475
15.1k23475
New contributor
asked yesterday
ThomasSt
1213
1213
New contributor
New contributor
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday
add a comment |
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday
add a comment |
2 Answers
2
active
oldest
votes
This function:
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
has a few issues. top_numb
is unused, so delete it. You can both construct and return the list in the same statement, but due to its length I suggest that you add some linebreaks in that list. Finally: per Python 3 docs, fieldnames
must be a sequence but needn't be a list - so make this a tuple ()
and not a list because the data are immutable.
Otherwise:
Remove redundant return
s
i.e. the no-op return
seen in write_csv_header
.
Make a main
function
...for all of your global code, for a couple of reasons - to clean up the global namespace, and to make your code callable as a library for other applications.
Use f-strings
...for strings like this:
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
that can be:
data_filename = f'Hashtag Scrape {timestamp}.csv'
Write more subroutines
The bulk of your logic within the main for keyword in keywords
loop is quite long. Break this up into several subroutines for legibility and maintainability.
Use requests
You're calling into urllib.request.Request
, but there's usually no good reason to do this. Use requests
instead, which is better in nearly every way.
Apply a linter
This will catch non-PEP8 whitespace (or lack thereof) such as that seen in this statement:
if len(proxies)!=0:
Imports at the top
In the middle of your source, we see:
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
It's usually considered better practice to do all of your imports at the top of the source file.
Don't declare indices that you don't use
This loop:
i = avg_post_likes = 0
while i <=x-1:
# ...
i += 1
should be
for _ in range(x):
# ...
You also need a better name for x
.
Use dict.update
This code:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
can be greatly simplified by use of update
:
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
# ...
Use context management
You were doing so well elsewhere in the file! But then we see this:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
Those should also use with
. You can keep the indentation from getting out-of-control by writing more subroutines.
add a comment |
Update 1
I have adapted my original piece of code based on the suggestions by Reinderien (Thanks for that impressive and thorough answer! Learned a bunch of things.)
Right now, I have modified the following things:
- the loop
for i in range(top_post_count):
- the post_info code by using the
.update
functionality - modified code to use
f
string instead of+
(also applying this to other scripts already! - removed redundant and unused pieces of code
However, I'm still struggling a bit to rewrite:
this section:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
I managed to rewrite the
data = open
section, but haven't figured out how to do the same with thetarget
functionality, without adding more indentations.
also tried replacing
urllib.request.Request
byrequests
, but that doesn't work. Will need to revisit that and see if I need to replace that whole section (which I probably do)- overall I'm struggling to add more subroutines as I'm not sure which pieces of code would make good ones and how to apply the new subroutines correctly.
- moving the
datetime
import to the head of my script cause it to break, somehow it looks like they can't be used within my loops without recalling them there.
using
csv_writer.writeheader()
also seems to repeat it whenever a keyword is written, this is now captured by my deduplication code, but shouldn't be the case.
This script is written for personal research and is not endorsed by Instagram.
Use at your own risk!
-- coding: utf-8 --
import csv
import requests
from urllib.request import Request, urlopen
import json
import re
import random
import time
import os
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
file
data_filename = f'Hashtag Scrape {timestamp}.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
MAIN PROGRAM
for keyword in keywords:
import urllib, json, requests
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
f'https://www.instagram.com/explore/tags/{urllib.parse.quote_plus(keyword)}?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
top_post_count = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
likes_value = comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = f'https://www.instagram.com/explore/tags/{keyword}'
post_ready_tag = f'#{keyword}'
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
for i in range(top_post_count):
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
print(f'Writing {keyword} to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes',
'MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
csv_writer = csv.DictWriter(data_out, fieldnames=fieldnames)
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
'Post Count': post_count,
'AVG. Likes': round(sum(likes_value)/len(likes_value),2),
'MAX. Likes': max(likes_value),
'MIN. Likes': min(likes_value),
'AVG. Comments': round(sum(comments_value)/len(comments_value),2),
'Hashtag URL': hashtag_url,
'Post Ready Tag': post_ready_tag
})
csv_writer.writeheader()
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
target = open(destination, 'w',encoding='utf-8')
Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
with open(data_filename, 'r',encoding='utf-8') as data:
Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
target.close()
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
return StackExchange.using("mathjaxEditing", function () {
StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
});
});
}, "mathjax-editing");
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "196"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
ThomasSt is a new contributor. Be nice, and check out our Code of Conduct.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210893%2fscraping-instagram-for-hashtag-data%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
This function:
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
has a few issues. top_numb
is unused, so delete it. You can both construct and return the list in the same statement, but due to its length I suggest that you add some linebreaks in that list. Finally: per Python 3 docs, fieldnames
must be a sequence but needn't be a list - so make this a tuple ()
and not a list because the data are immutable.
Otherwise:
Remove redundant return
s
i.e. the no-op return
seen in write_csv_header
.
Make a main
function
...for all of your global code, for a couple of reasons - to clean up the global namespace, and to make your code callable as a library for other applications.
Use f-strings
...for strings like this:
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
that can be:
data_filename = f'Hashtag Scrape {timestamp}.csv'
Write more subroutines
The bulk of your logic within the main for keyword in keywords
loop is quite long. Break this up into several subroutines for legibility and maintainability.
Use requests
You're calling into urllib.request.Request
, but there's usually no good reason to do this. Use requests
instead, which is better in nearly every way.
Apply a linter
This will catch non-PEP8 whitespace (or lack thereof) such as that seen in this statement:
if len(proxies)!=0:
Imports at the top
In the middle of your source, we see:
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
It's usually considered better practice to do all of your imports at the top of the source file.
Don't declare indices that you don't use
This loop:
i = avg_post_likes = 0
while i <=x-1:
# ...
i += 1
should be
for _ in range(x):
# ...
You also need a better name for x
.
Use dict.update
This code:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
can be greatly simplified by use of update
:
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
# ...
Use context management
You were doing so well elsewhere in the file! But then we see this:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
Those should also use with
. You can keep the indentation from getting out-of-control by writing more subroutines.
add a comment |
This function:
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
has a few issues. top_numb
is unused, so delete it. You can both construct and return the list in the same statement, but due to its length I suggest that you add some linebreaks in that list. Finally: per Python 3 docs, fieldnames
must be a sequence but needn't be a list - so make this a tuple ()
and not a list because the data are immutable.
Otherwise:
Remove redundant return
s
i.e. the no-op return
seen in write_csv_header
.
Make a main
function
...for all of your global code, for a couple of reasons - to clean up the global namespace, and to make your code callable as a library for other applications.
Use f-strings
...for strings like this:
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
that can be:
data_filename = f'Hashtag Scrape {timestamp}.csv'
Write more subroutines
The bulk of your logic within the main for keyword in keywords
loop is quite long. Break this up into several subroutines for legibility and maintainability.
Use requests
You're calling into urllib.request.Request
, but there's usually no good reason to do this. Use requests
instead, which is better in nearly every way.
Apply a linter
This will catch non-PEP8 whitespace (or lack thereof) such as that seen in this statement:
if len(proxies)!=0:
Imports at the top
In the middle of your source, we see:
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
It's usually considered better practice to do all of your imports at the top of the source file.
Don't declare indices that you don't use
This loop:
i = avg_post_likes = 0
while i <=x-1:
# ...
i += 1
should be
for _ in range(x):
# ...
You also need a better name for x
.
Use dict.update
This code:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
can be greatly simplified by use of update
:
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
# ...
Use context management
You were doing so well elsewhere in the file! But then we see this:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
Those should also use with
. You can keep the indentation from getting out-of-control by writing more subroutines.
add a comment |
This function:
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
has a few issues. top_numb
is unused, so delete it. You can both construct and return the list in the same statement, but due to its length I suggest that you add some linebreaks in that list. Finally: per Python 3 docs, fieldnames
must be a sequence but needn't be a list - so make this a tuple ()
and not a list because the data are immutable.
Otherwise:
Remove redundant return
s
i.e. the no-op return
seen in write_csv_header
.
Make a main
function
...for all of your global code, for a couple of reasons - to clean up the global namespace, and to make your code callable as a library for other applications.
Use f-strings
...for strings like this:
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
that can be:
data_filename = f'Hashtag Scrape {timestamp}.csv'
Write more subroutines
The bulk of your logic within the main for keyword in keywords
loop is quite long. Break this up into several subroutines for legibility and maintainability.
Use requests
You're calling into urllib.request.Request
, but there's usually no good reason to do this. Use requests
instead, which is better in nearly every way.
Apply a linter
This will catch non-PEP8 whitespace (or lack thereof) such as that seen in this statement:
if len(proxies)!=0:
Imports at the top
In the middle of your source, we see:
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
It's usually considered better practice to do all of your imports at the top of the source file.
Don't declare indices that you don't use
This loop:
i = avg_post_likes = 0
while i <=x-1:
# ...
i += 1
should be
for _ in range(x):
# ...
You also need a better name for x
.
Use dict.update
This code:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
can be greatly simplified by use of update
:
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
# ...
Use context management
You were doing so well elsewhere in the file! But then we see this:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
Those should also use with
. You can keep the indentation from getting out-of-control by writing more subroutines.
This function:
def get_csv_header(top_numb):
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes','MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
return fieldnames
has a few issues. top_numb
is unused, so delete it. You can both construct and return the list in the same statement, but due to its length I suggest that you add some linebreaks in that list. Finally: per Python 3 docs, fieldnames
must be a sequence but needn't be a list - so make this a tuple ()
and not a list because the data are immutable.
Otherwise:
Remove redundant return
s
i.e. the no-op return
seen in write_csv_header
.
Make a main
function
...for all of your global code, for a couple of reasons - to clean up the global namespace, and to make your code callable as a library for other applications.
Use f-strings
...for strings like this:
data_filename = 'Hashtag Scrape ' + timestamp + '.csv'
that can be:
data_filename = f'Hashtag Scrape {timestamp}.csv'
Write more subroutines
The bulk of your logic within the main for keyword in keywords
loop is quite long. Break this up into several subroutines for legibility and maintainability.
Use requests
You're calling into urllib.request.Request
, but there's usually no good reason to do this. Use requests
instead, which is better in nearly every way.
Apply a linter
This will catch non-PEP8 whitespace (or lack thereof) such as that seen in this statement:
if len(proxies)!=0:
Imports at the top
In the middle of your source, we see:
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
It's usually considered better practice to do all of your imports at the top of the source file.
Don't declare indices that you don't use
This loop:
i = avg_post_likes = 0
while i <=x-1:
# ...
i += 1
should be
for _ in range(x):
# ...
You also need a better name for x
.
Use dict.update
This code:
post_info["Hashtag"] = hashtag_name
post_info["Active Days Ago"] = posted_days_ago
post_info["Post Count"] = post_count
post_info["AVG. Likes"] = round(sum(likes_value)/len(likes_value),2)
post_info["MAX. Likes"] = max(likes_value)
post_info["MIN. Likes"] = min(likes_value)
post_info["AVG. Comments"] = round(sum(comments_value)/len(comments_value),2)
post_info["Hashtag URL"] = hashtag_url
post_info["Post Ready Tag"] = post_ready_tag
can be greatly simplified by use of update
:
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
# ...
Use context management
You were doing so well elsewhere in the file! But then we see this:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
Those should also use with
. You can keep the indentation from getting out-of-control by writing more subroutines.
answered yesterday
Reinderien
3,842821
3,842821
add a comment |
add a comment |
Update 1
I have adapted my original piece of code based on the suggestions by Reinderien (Thanks for that impressive and thorough answer! Learned a bunch of things.)
Right now, I have modified the following things:
- the loop
for i in range(top_post_count):
- the post_info code by using the
.update
functionality - modified code to use
f
string instead of+
(also applying this to other scripts already! - removed redundant and unused pieces of code
However, I'm still struggling a bit to rewrite:
this section:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
I managed to rewrite the
data = open
section, but haven't figured out how to do the same with thetarget
functionality, without adding more indentations.
also tried replacing
urllib.request.Request
byrequests
, but that doesn't work. Will need to revisit that and see if I need to replace that whole section (which I probably do)- overall I'm struggling to add more subroutines as I'm not sure which pieces of code would make good ones and how to apply the new subroutines correctly.
- moving the
datetime
import to the head of my script cause it to break, somehow it looks like they can't be used within my loops without recalling them there.
using
csv_writer.writeheader()
also seems to repeat it whenever a keyword is written, this is now captured by my deduplication code, but shouldn't be the case.
This script is written for personal research and is not endorsed by Instagram.
Use at your own risk!
-- coding: utf-8 --
import csv
import requests
from urllib.request import Request, urlopen
import json
import re
import random
import time
import os
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
file
data_filename = f'Hashtag Scrape {timestamp}.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
MAIN PROGRAM
for keyword in keywords:
import urllib, json, requests
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
f'https://www.instagram.com/explore/tags/{urllib.parse.quote_plus(keyword)}?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
top_post_count = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
likes_value = comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = f'https://www.instagram.com/explore/tags/{keyword}'
post_ready_tag = f'#{keyword}'
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
for i in range(top_post_count):
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
print(f'Writing {keyword} to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes',
'MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
csv_writer = csv.DictWriter(data_out, fieldnames=fieldnames)
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
'Post Count': post_count,
'AVG. Likes': round(sum(likes_value)/len(likes_value),2),
'MAX. Likes': max(likes_value),
'MIN. Likes': min(likes_value),
'AVG. Comments': round(sum(comments_value)/len(comments_value),2),
'Hashtag URL': hashtag_url,
'Post Ready Tag': post_ready_tag
})
csv_writer.writeheader()
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
target = open(destination, 'w',encoding='utf-8')
Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
with open(data_filename, 'r',encoding='utf-8') as data:
Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
target.close()
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
add a comment |
Update 1
I have adapted my original piece of code based on the suggestions by Reinderien (Thanks for that impressive and thorough answer! Learned a bunch of things.)
Right now, I have modified the following things:
- the loop
for i in range(top_post_count):
- the post_info code by using the
.update
functionality - modified code to use
f
string instead of+
(also applying this to other scripts already! - removed redundant and unused pieces of code
However, I'm still struggling a bit to rewrite:
this section:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
I managed to rewrite the
data = open
section, but haven't figured out how to do the same with thetarget
functionality, without adding more indentations.
also tried replacing
urllib.request.Request
byrequests
, but that doesn't work. Will need to revisit that and see if I need to replace that whole section (which I probably do)- overall I'm struggling to add more subroutines as I'm not sure which pieces of code would make good ones and how to apply the new subroutines correctly.
- moving the
datetime
import to the head of my script cause it to break, somehow it looks like they can't be used within my loops without recalling them there.
using
csv_writer.writeheader()
also seems to repeat it whenever a keyword is written, this is now captured by my deduplication code, but shouldn't be the case.
This script is written for personal research and is not endorsed by Instagram.
Use at your own risk!
-- coding: utf-8 --
import csv
import requests
from urllib.request import Request, urlopen
import json
import re
import random
import time
import os
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
file
data_filename = f'Hashtag Scrape {timestamp}.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
MAIN PROGRAM
for keyword in keywords:
import urllib, json, requests
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
f'https://www.instagram.com/explore/tags/{urllib.parse.quote_plus(keyword)}?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
top_post_count = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
likes_value = comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = f'https://www.instagram.com/explore/tags/{keyword}'
post_ready_tag = f'#{keyword}'
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
for i in range(top_post_count):
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
print(f'Writing {keyword} to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes',
'MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
csv_writer = csv.DictWriter(data_out, fieldnames=fieldnames)
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
'Post Count': post_count,
'AVG. Likes': round(sum(likes_value)/len(likes_value),2),
'MAX. Likes': max(likes_value),
'MIN. Likes': min(likes_value),
'AVG. Comments': round(sum(comments_value)/len(comments_value),2),
'Hashtag URL': hashtag_url,
'Post Ready Tag': post_ready_tag
})
csv_writer.writeheader()
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
target = open(destination, 'w',encoding='utf-8')
Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
with open(data_filename, 'r',encoding='utf-8') as data:
Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
target.close()
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
add a comment |
Update 1
I have adapted my original piece of code based on the suggestions by Reinderien (Thanks for that impressive and thorough answer! Learned a bunch of things.)
Right now, I have modified the following things:
- the loop
for i in range(top_post_count):
- the post_info code by using the
.update
functionality - modified code to use
f
string instead of+
(also applying this to other scripts already! - removed redundant and unused pieces of code
However, I'm still struggling a bit to rewrite:
this section:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
I managed to rewrite the
data = open
section, but haven't figured out how to do the same with thetarget
functionality, without adding more indentations.
also tried replacing
urllib.request.Request
byrequests
, but that doesn't work. Will need to revisit that and see if I need to replace that whole section (which I probably do)- overall I'm struggling to add more subroutines as I'm not sure which pieces of code would make good ones and how to apply the new subroutines correctly.
- moving the
datetime
import to the head of my script cause it to break, somehow it looks like they can't be used within my loops without recalling them there.
using
csv_writer.writeheader()
also seems to repeat it whenever a keyword is written, this is now captured by my deduplication code, but shouldn't be the case.
This script is written for personal research and is not endorsed by Instagram.
Use at your own risk!
-- coding: utf-8 --
import csv
import requests
from urllib.request import Request, urlopen
import json
import re
import random
import time
import os
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
file
data_filename = f'Hashtag Scrape {timestamp}.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
MAIN PROGRAM
for keyword in keywords:
import urllib, json, requests
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
f'https://www.instagram.com/explore/tags/{urllib.parse.quote_plus(keyword)}?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
top_post_count = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
likes_value = comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = f'https://www.instagram.com/explore/tags/{keyword}'
post_ready_tag = f'#{keyword}'
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
for i in range(top_post_count):
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
print(f'Writing {keyword} to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes',
'MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
csv_writer = csv.DictWriter(data_out, fieldnames=fieldnames)
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
'Post Count': post_count,
'AVG. Likes': round(sum(likes_value)/len(likes_value),2),
'MAX. Likes': max(likes_value),
'MIN. Likes': min(likes_value),
'AVG. Comments': round(sum(comments_value)/len(comments_value),2),
'Hashtag URL': hashtag_url,
'Post Ready Tag': post_ready_tag
})
csv_writer.writeheader()
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
target = open(destination, 'w',encoding='utf-8')
Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
with open(data_filename, 'r',encoding='utf-8') as data:
Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
target.close()
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
Update 1
I have adapted my original piece of code based on the suggestions by Reinderien (Thanks for that impressive and thorough answer! Learned a bunch of things.)
Right now, I have modified the following things:
- the loop
for i in range(top_post_count):
- the post_info code by using the
.update
functionality - modified code to use
f
string instead of+
(also applying this to other scripts already! - removed redundant and unused pieces of code
However, I'm still struggling a bit to rewrite:
this section:
data = open(data_filename, 'r',encoding='utf-8')
target = open(destination, 'w',encoding='utf-8')
I managed to rewrite the
data = open
section, but haven't figured out how to do the same with thetarget
functionality, without adding more indentations.
also tried replacing
urllib.request.Request
byrequests
, but that doesn't work. Will need to revisit that and see if I need to replace that whole section (which I probably do)- overall I'm struggling to add more subroutines as I'm not sure which pieces of code would make good ones and how to apply the new subroutines correctly.
- moving the
datetime
import to the head of my script cause it to break, somehow it looks like they can't be used within my loops without recalling them there.
using
csv_writer.writeheader()
also seems to repeat it whenever a keyword is written, this is now captured by my deduplication code, but shouldn't be the case.
This script is written for personal research and is not endorsed by Instagram.
Use at your own risk!
-- coding: utf-8 --
import csv
import requests
from urllib.request import Request, urlopen
import json
import re
import random
import time
import os
from fake_useragent import UserAgent
from random import randint
from time import sleep
ua = UserAgent(cache=False)
ts = time.gmtime()
timestamp = time.strftime("%d-%m-%Y %H-%M", ts)
def read_keywords(t_file):
with open(t_file) as f:
keyword_list = f.read().splitlines()
return keyword_list
def read_proxies(p_file):
with open(p_file) as f:
proxy_list = f.read().splitlines()
return proxy_list
file
data_filename = f'Hashtag Scrape {timestamp}.csv'
KEYWORD_FILE = './hashtags.txt'
DATA_FILE = './' + data_filename
PROXY_FILE = './proxies.txt'
keywords = read_keywords(KEYWORD_FILE)
proxies = read_proxies(PROXY_FILE)
Ask for randomisation input fields
low = input("Please enter minimal delay time (in seconds): ")
low_random = int(low)
high = input("Please enter maximal delay time (in seconds): ")
high_random = int(high)
MAIN PROGRAM
for keyword in keywords:
import urllib, json, requests
if len(proxies)!=0:
proxy_ip = random.choice(proxies)
proxy_support = urllib.request.ProxyHandler({'https':proxy_ip})
opener = urllib.request.build_opener(proxy_support)
urllib.request.install_opener(opener)
prepare_url = urllib.request.Request(
f'https://www.instagram.com/explore/tags/{urllib.parse.quote_plus(keyword)}?__a=1',
headers={
'User-Agent': ua.random
}
)
url = urllib.request.urlopen(prepare_url)
post_info = {}
response = json.load(url) #response is the JSON dump of the url.
#defining some script helpers
top_post_count = len(response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'])
likes_value = comments_value =
#Getting the general tag data
hashtag_name = response['graphql']['hashtag']['name']
post_count = response['graphql']['hashtag']['edge_hashtag_to_media']['count']
hashtag_url = f'https://www.instagram.com/explore/tags/{keyword}'
post_ready_tag = f'#{keyword}'
top_posts = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges']
#calculate the active days ago
most_recent_post = response['graphql']['hashtag']['edge_hashtag_to_media']['edges'][0]['node']['taken_at_timestamp']
import datetime
from dateutil import relativedelta
post_datetime = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d %H:%M:%S')
post_cleandate = datetime.datetime.fromtimestamp(most_recent_post).strftime('%Y-%m-%d')
from datetime import datetime, date
most_recent_clean = datetime.strptime(post_cleandate, '%Y-%m-%d')
today = datetime.strptime(str(date.today()),'%Y-%m-%d')
posted_days_ago = relativedelta.relativedelta(today, most_recent_clean).days
for i in range(top_post_count):
#Getting data from top posts
top_post_likes = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']
post_like = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_liked_by']['count']
post_comment = response['graphql']['hashtag']['edge_hashtag_to_top_posts']['edges'][i]['node']['edge_media_to_comment']['count']
likes_value.append(post_like)
comments_value.append(post_comment)
print(f'Writing {keyword} to output file')
with open(data_filename, 'a', newline='', encoding='utf-8') as data_out:
fieldnames = ['Hashtag','Active Days Ago','Post Count','AVG. Likes',
'MAX. Likes','MIN. Likes','AVG. Comments','Hashtag URL','Post Ready Tag']
csv_writer = csv.DictWriter(data_out, fieldnames=fieldnames)
post_info.update({
'Hashtag': hashtag_name,
'Active Days Ago': posted_days_ago,
'Post Count': post_count,
'AVG. Likes': round(sum(likes_value)/len(likes_value),2),
'MAX. Likes': max(likes_value),
'MIN. Likes': min(likes_value),
'AVG. Comments': round(sum(comments_value)/len(comments_value),2),
'Hashtag URL': hashtag_url,
'Post Ready Tag': post_ready_tag
})
csv_writer.writeheader()
csv_writer.writerow(post_info)
#Randomly pause script based on input values
sleep(randint(low_random,high_random))
cleaning up the file:
destination = data_filename[:-4] + '_unique.csv'
target = open(destination, 'w',encoding='utf-8')
Let the user know you are starting, in case you are de-dupping a huge file
print("nRemoving duplicates from %r" % data_filename)
Initialize variables and counters
unique_lines = set()
source_lines = 0
duplicate_lines = 0
with open(data_filename, 'r',encoding='utf-8') as data:
Loop through data, write uniques to output file, skip duplicates.
for line in data:
source_lines += 1
# Strip out the junk for an easy set check, also saves memory
line_to_check = line.strip('rn')
if line_to_check in unique_lines: # Skip if line is already in set
duplicate_lines += 1
continue
else: # Write if new and append stripped line to list of seen lines
target.write(line)
unique_lines.add(line_to_check)
target.close()
os.remove(data_filename)
os.rename(destination, data_filename)
print("SUCCESS: Removed %d duplicate line(s) from file with %d line(s)." %
(duplicate_lines, source_lines))
print("Wrote output to %rn" % data_filename)
print("n" + 'ALL DONE !!!! ')
answered yesterday
community wiki
ThomasSt
add a comment |
add a comment |
ThomasSt is a new contributor. Be nice, and check out our Code of Conduct.
ThomasSt is a new contributor. Be nice, and check out our Code of Conduct.
ThomasSt is a new contributor. Be nice, and check out our Code of Conduct.
ThomasSt is a new contributor. Be nice, and check out our Code of Conduct.
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f210893%2fscraping-instagram-for-hashtag-data%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Why aren't you using their API? instagram.com/developer
– Reinderien
yesterday
Because I don't need to (yet), and don't really want to either :). I can get this data without using tokens and login credentials. So that's my preferred approach.
– ThomasSt
yesterday
I coded something as similar as yours: codereview.stackexchange.com/questions/210613/… If you're interested in more web scraping.
– austingae
yesterday
Please do not update the code in your question to incorporate feedback from answers, doing so goes against the Question + Answer style of Code Review. This is not a forum where you should keep the most updated version in your question. Please see what you may and may not do after receiving answers.
– Zeta
yesterday