Streaming tweets using Twitter V2 API | Tweepy
Get live tweets using Twitter V2 API and store them in a database.
With v2 Twitter API, things have changed when it comes to streaming tweets. Today we’re going to see how to use StreamingClient to stream tweets and store them into an SQLite3 database.
About Twitter V2 API
For streaming tweets, you are most likely to apply for an “Elevated” account.
The application process is fairly simple and easy. Once the application has been submitted, you will receive an “approval” email from the Twitter Dev team.
Things to be done on your Twitter developer portal
After you’ve got your **Elevate** access, visit the Developer portal to get your projects and apps ready.
Move to the projects and apps menu, present on the left side of the developer portal, and add an application as required.
Click on “Add app
1. Select App environment
2. App name
3. Keys & Token
Next, you will get your API keys and tokens along with a bearer token.
💡 Save them, cause we’ll need them to make requests to the Twitter API.
Now, let’s move on to the next section.
Installing tweepy
Installing Tweepy is pretty straightforward 📏.
The official tweepy documentation has everything we need. Make sure to have a look at it.
- Make a python virtual environment
python -m venv venv
2. Install tweepy
pip install tweepy
See, that’s not hard😸.
Now that we are done with the requirements, we can move to the coding section.
Let’s write some code
Before that, let’s structure our code.
1. Make a database directory where we’ll store the SQLite DB files.
2. A main.py file where all our code goes in, and
3. A .env file that will store all our API keys and tokens.
💡 For this project, I have put everything into one file but you can always refactor them into separate modules as per requirements.
Now, We are ready! 🚗
1. Store the API keys and tokens in a .env file
API_KEY=”apikeygoeshere”
API_KEY_SECRET=”apikeysecretgoeshere”
ACCESS_TOKEN=”accesstokengoeshere”
ACCESS_TOKEN_SECRET=”accesstokensecretgoeshere”
BEARER_TOKEN=”bearertokengoeshere”
2. Importing all necessary packages
from dotenv import load_dotenv
import os
import sqlite3
import tweepy
import time
import argparse
3. Loading the API credentials
load_dotenv()
api_key = os.getenv(“API_KEY”)
api_key_secret = os.getenv(“API_KEY_SECRET”)
access_key = os.getenv(“ACCESS_KEY”)
access_key_secret = os.getenv(“ACCESS_KEY_SCERET”)
bearer_token = os.getenv(“BEARER_TOKEN”)
4. Creating the database
conn = sqlite3.connect(“./database/tweets.db”)
print(“DB created!”)
cursor = conn.cursor()
cursor.execute(“CREATE TABLE IF NOT EXISTS tweets (username TEXT,tweet TEXT)”)
print(“Table created”)
5. Creating the Streaming class
class TweetStreamV2(tweepy.StreamingClient):
new_tweet = {}def on_connect(self):
print(“Connected!”)def on_includes(self, includes):
self.new_tweet[“username”] = includes[“users”][0].username
print(self.new_tweet)
# insert tweets in db
cursor.execute(
“INSERT INTO tweets VALUES (?,?)”,
(
self.new_tweet[“username”],
self.new_tweet[“tweet”],
),
)
conn.commit()
# print(self.new_tweet)
print(“tweet added to db!”)
print(“-” * 30)def on_tweet(self, tweet):
if tweet.referenced_tweets == None:
# self.new_tweet[“tweet”] = tweet.text
print(tweet.text)
time.sleep(0.3)
What does the code say?
💡 Before moving into details, I request you to please have a look at the StreamingClient documentation. This will make things more clear.
- on_connect method prints a “Connected” message, letting us know that we have successfully connected to the Twitter API.
- on_tweet method receives a tweet and processes it according to the conditions, if there are any, and adds the tweet to the hashmap.
- on_includes is responsible for the user details and adds the user data to the hashmap.
- Finally, the data in the hashmap is inserted into the tweets table.
5. Main function
def main():
# get args
parser = argparse.ArgumentParser()
parser.add_argument(“search_query”, help=”Twitter search query”)
args = parser.parse_args()
query = args.search_querystream = TweetStreamV2(bearer_token)# delete previous query
prev_id = stream.get_rules().data[0].id
stream.delete_rules(prev_id)
# add new query
stream.add_rules(tweepy.StreamRule(query))print(stream.get_rules())stream.filter(
tweet_fields=[“created_at”, “lang”],
expansions=[“author_id”],
user_fields=[“username”, “name”],
)
What does the code say?
- The python script takes an argument, search_query.
- This argument is added to the stream rules after deleting the previously added rules.
- Rules are basically searched queries that go in as input into the stream object. There can be more than one rule. And each rule has a `value`, `tag` and an `id`.
- The `id` is passed on to the `delete_rules` method to delete a rule.
💡 I suggest you refer to the official documentation for more details on adding and deleting rules.
- Next, we have the filter method. It is responsible for filtering the tweets based on the `query` passed and the fields chosen.
All the different fields are:
expansions (list[str] | str) — expansions
media_fields (list[str] | str) — media_fields
place_fields (list[str] | str) — place_fields
poll_fields (list[str] | str) — poll_fields
tweet_fields (list[str] | str) — tweet_fields
user_fields (list[str] | str) — user_fields
threaded (bool) — Whether or not to use a thread to run the stream
💡 Refer to the official documentation
Let’s try out our app
To test if everything is working, we pass on the Spiderman argument while running the main.py file.
$ python main.py Spiderman
This will create a tweets.db file inside the database directory.
You will find a table with `username` and `tweet` as its columns respectively inside the db file.
Conclusion
This example shows how to use the Twitter V2 API with python using the Tweepy library to get live tweets and store them in a database. You can also use CSV, or JSON files to store tweets.
I will keep adding more blogs to this series.
🤝Follow me on Twitter.
🌎Explore, 🎓Learn, 👷♂️Build.
Happy Coding💛