Tweet analysis service
This lesson demonstrates how Redis can be used as the foundation for a analytics-like back end service. The application showcased here consists of multiple services which ingest tweets in real time, store them in Redis and allow the tweet information to be queried. SET
, HASH
data structures are leveraged along with the reliable queue
pattern with Redis LIST
s.
Real time tweets stats service
The service allows users to define a bunch of tweet keywords/terms they are interested in e.g. since you're reading this, you might be interested in redis, nosql, database, golang etc. It ingests tweets in real time and allows users to query those tweets with criteria based on the keywords/terms which they had previously defined. It is then possible to
find all the tweets with a specific keyword e.g. tweets which contain the word
redis
find all the tweets with a combination of keywords e.g. tweets which contain both
golang
andnosql
, tweets which containdatabase
orredis
use the above criteria with an added date filter e.g. tweets posted on
15th May 2018
which contain the termsredis
,golang
andnosql
The application comprises of three distinct microservices and each of them is responsible for a specific piece of the overall functionality
Tweets ingestion microservice - ingests real time tweets from the Twitter Streaming API and pushes them to Redis
Tweets processor microservice - uses the reliable consumer pattern to process each tweet and saves them to Redis for further analysis
Tweets statistics microservice - exposes a REST API for users to be able to query tweet data
Technical stack
The sample app uses
go-twitter to tap into the Twitter Streaming API
gin-gonic for REST based services
go-redis as the Redis Go client
pre-built Docker image for Redis
Docker Compose to run the solution with a single command
Schema
tweets
- a RedisLIST
which stores raw tweets in JSON formattweet:[id]
- aHASH
containing tweet details like tweet ID, tweeter, date, tweet text and the matched keywords e.g.tweet:987654321
keyword_tweets:[term]
- aSET
which stores tweets (only the IDs) for a specific keyword e.g.keyword_tweets:java
keyword_tweets:[term]:[date]
- anotherSET
stores tweets (only the IDs) for a specific keyword posted on a specific date e.g.keyword_tweets:nosql:2018-05-10
Implementation details
Let's look at some of the internal details. You can grab the source code from Github
Tweet Ingestion service
The code package structure for this service is as follows
The bulk of this service is implemented in the Start
function (in service/tweets-listener.go
). It's task is to tap into the Twitter Streaming API fetch tweets based on the user defined keyword(s).
the list of keywords can be provided using the
TWITTER_TRACKED_TERMS
environment variable and defaults totrump,realDonaldTrump,redis,golang,database,nosql
(comma separated)
Each tweet is serialized to a JSON and stored in a the tweets
LIST
in Redis using LPUSH
.
Redis as a Queue
In addition to the traditional use cases of adding and querying data, Redis LIST
s are heavily used as queues. This is made possible by commands like LPUSH
and RPUSH
which allow you to add data to the head and tail of the LIST
respectively (note that it's a constant time operation irrespective of the list size!) and then extracting (and removing them at the same time) them (probably in another process/program) using LPOP
and RPOP
. LIST
s also support blocking variations of these operations i.e. BLPOP
and BRPOP
. These commands block for the specified time period (or indefintely if its set to 0) an item to appear in the queue and then pop
it out. Another great property is that if multiple programs/processes are involved, each of them receives a unique item from the queue. This means that the processing workload can be distributed among multiple processes and can be easily scald horizontally
Bootstrap
The entry point to the service is main.go
which sets up the REST API routes for the tweets listener life cycle manager
Tweet Ingestion Service Lifecycle Manager (LCM)
A convenient REST API can be used to start/stop the tweet ingestion service - thanks to StartServiceHandler
and StopServiceHandler
handler functions in lcm/api.go
which allow the user to start and stop the service using HTTP GET
and DELETE
respectively e.g. to start the tweets ingestion service, you need to send an HTTP GET
to /tweets/producer
and DELETE
to the same URI to stop the service
The handler methods delegate the work to the core implementation which is a part of service/tweets-listener.go
As mentioned earlier, the Start()
function is the work horse of the service
It starts by connecting to Redis
.. followed by Twitter
Then, a *twitter.Stream
is created with the specified parameters which includes the tweet filteration criteria based on the keywords provided by the user
We then define a twitter.SwitchDemux
which is nothing but a function which defines what to do when a Tweet is recieved. In this case, the implementation is to push it to Redis in JSON format
The JSON serialization happens in two steps wherein the tweet data is first converted to a tweetInfo
struct which is then converted ti JSON string using json.Marshal
The twitter stream listener is started as a different goroutine
.. and another goroutine is started to make sure that the stream is closed when this service is stopped via the REST API. It blocks on the apiStopChannel
and closes the stream as well as the Redis connection.
Tweet Consumer service
It's responsible for consuming the tweets enqueued in Redis LIST
(tweets
) by the Tweet Ingestion service. It blocks and waits for tweets to appear and processes and saves them back to Redis such that they are ready for further analysis. This is done in a reliable manner, thanks to BRPOPLPUSH
command
Reliable processing
It is possible that the data obtained by the *POP
commands specified above is received but not processed due to some reason e.g. if the consumer application crashes. This can lead to messages/events/data geting lost. There is a reliable alternative using RPOPLPUSH
or its blocking variation BRPOPLPUSH
. It simply picks up the data from the tail of the source queue and puts it in a destination queue of your choice (which by the way can be the same as the source queue!). What's special about this process is that it is atomic and reliable in nature i.e. the transfer operation from one queue to another will either happen successfully or fail
if the transfer is successful, you will have your data safely backed-up in another queue
if the
pop-and-push
fails, you can handle this in your application and retry it (your data/event is still safe and under your control)
once the processing is done, the data can be removed (with
LREM
) from the back-up listeven if the consumer process (which executed the transfer) fails to process the data or crashes, it's always possible to pick up the data from the back-up list and put it back to the original queue and the processing cycle will begin once again
The logic is contained within tweets-consumer.go
BRPOPLPUSH
is used in an infinite for
loop for continuous tweet ingestion
The process
function ensures data is stored in HASH
es and SET
s to make it ready for analysis/query
it de-serializes the tweet information from its JSON form into a Go struct (
tweet
)uses
HMSET
to store the tweet details in aHASH
whose naming format istweet:[tweetID]
also stores the tweet info in two separate
SET
s usingSADD
keyword_tweets:[term]
e.g. if a tweet contains the keywordsredis
andnosql
, it will be stored in both theSET
skeyword_tweet:redis
andkeyword_tweet:nosql
keyword_tweets:[term]:[created_date]
(e.g.keyword_tweets:java:2018-05-10
)this is repeated for all the matched keywords/terms
Since the logic involves three distinct calls to Redis, Pipelining
was used to make this more efficient i.e. the HMSET
and SADD
commands are invoked in a batch - this bring us down from three invocations to one.
Once tweet information is stored (processed successfully), the entry from the list is removed using LREM
Tweets statistics service
This module exposes a REST API to extract tweet information based on few user specified criterion - tweets-stats-api.go
is where all the logic resides
The first criteria allows user to search tweets based on keywords/terms
you can specify one or more keywords (
keywords
query parameter), and,choose whether you want to apply the
AND
orOR
criteria on top of it (op
query parameter)
The second criteria adds the date
dimension (along with the keyword) - using the date
query parameter. The logic is the same as the previous API. The difference is the SET
s which are queried - these are the ones in which date-wise keyword information is stored keyword_tweets:[keyword]:[date]
e.g.keyword_tweets:java:2018-05-10
findTweetsHandler
is the function serves as the entry point of the REST API. It extracts the required information from the URL i.e. keyword(s), operation (AND
or OR
), date and passes on the work to a more generic findTweets
function
The first part of findTweets
function calculates the SET
s from which the tweet info needs to be extracted
In case of a single keyword, the a single SET
name needs to be queried e.g. keyword_tweet:redis
. In case of multiple keywords with AND
criteria, the intersection of all SET
s is calculated (SINTERSTORE
) e.g. keyword_tweet:java
AND keyword_tweet:nosql
. For multiple keywords with OR
criteria, the union of all SET
s is calculated (SUNIONSTORE
) e.g. keyword_tweet:redis
OR keyword_tweet:nosql
OR keyword_tweet:database
The resulting SET
just contains the tweet ID and the tweets:[tweet_id]
which is obtained using SMEMBERS
The details of the tweet is obtained by querying each tweet ID from the SET
from the specific HASH
(e.g. tweet:987654321
) using HGETALL
These details are serialized and returned to user in JSON format
Docker setup
Here is the docker-compose.yml
for the application
It defines four services
redis
- this is based off the Docker Hub Redis imagetweets-ingestion-service
- it is based on a custom DockerfileFROM golang as build-stage WORKDIR /go/ RUN go get -u github.com/go-redis/redis && go get -u github.com/gin-gonic/gin && go get -u github.com/dghubble/oauth1 && go get -u github.com/dghubble/go-twitter/twitter COPY src/ /go/src RUN cd /go/src && CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o tweets-producer
FROM scratch COPY --from=build-stage /go/src/tweets-producer / CMD ["/tweets-producer"]
A multi-stage build process is used wherein a different image is used for building our Go app and a different image is used as the base for running it - golang
Dockerhub image is used for the build process which results in a single binary (for linux). Since we have the binary with all dependencies packed in, all we need is the minimal image for running it and thus we use the lightweight scratch
image for this purpose
tweets-consumer
andtweets-stats-service
- the image creation recipes for these services is the same astweets-ingestion-service
Test drive
Get the project -
git clone https://github.com/abhirockzz/practical-redis.git
cd practical-redis/real-time-twitter-analysis
Create Twitter app
You need to have a Twiter account in order to create a Twitter App. Browse to https://apps.twitter.com/
and click Create New App
to start the process
Update docker-compose.yml
After creating the app, update docker-compose.yml
with credentials associated with the app you just created (check the Keys and Access Tokens
tab)
TWITTER_CONSUMER_KEY
- populate this with the value inConsumer Key (API Key)
field on the app details pageTWITTER_CONSUMER_SECRET
- populate this with the value inConsumer Secret (API Secret)
field on the app details pageTWITTER_ACCESS_TOKEN
- populate this with the value inAccess Token
field on the app details pageTWITTER_ACCESS_TOKEN_SECRET
- populate this with the value inAccess Token Secret
field on the app details page
To start the services - docker-compose up --build
and use docker-compose down -v
to stop
Replace
DOCKER_IP
with the IP address of your Docker instance which you can obtain usingdocker-machine ip
. In case of Linux or Mac, this might belocalhost
. The port (8080
or8081
in this case) is the specified indocker-compose.yml
Start the tweet ingestion service
curl http://DOCKER_IP:8080/tweets/producer
to stop the service, use
curl -X DELETE http://DOCKER_IP:8080/tweets/producer
The ingestion service should now start consuming relevant tweets (as per the keywords specified by TWITTER_TRACKED_TERMS
environment variable). You can check redis to confirm the creation of the HASH
es and SET
s created by consumer service
Execute queries using stats service
As the tweets continue to flow and get processed, you can query the stats service to get tweet data for relevant keywords and dates
Get all tweets with a keyword (e.g. redis) - e.g. to search for tweets with the keyword
redis
executecurl http://DOCKER_IP:8081/tweets?keywords=redis
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets which contain the keyword)Get tweets with multiple keywords using
OR
operator - e.g. to search for tweets with keywordsjava
ordatabase
executecurl http://DOCKER_IP:8081/tweets?keywords=java,database&op=OR
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets which contain the keywords)Get tweets with multiple keywords using
AND
operator - e.g. to search for tweets with keywordsjava
anddatabase
executecurl http://DOCKER_IP:8081/tweets?keywords=java,database&op=AND
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets which contain the keywords)Get tweets with for a specific keyword on a date - e.g. to search for tweets with keyword
nosql
on 20 July, 2018 you need to executecurl http://DOCKER_IP:8081/tweets?keywords=nosql&date=20-06-2018
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets on th specified date which contain the keywords)
Get tweets with for multiple keywords on a date using
OR
operator - e.g. to search for tweets with keywordsjava
ordatabase
on 20 July, 2018 you need to executecurl http://DOCKER_IP:8081/tweets?keywords=java,database&op=OR&date=20-06-2018
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets on th specified date which contain the keywords)Get tweets with for multiple keywords on a date using
AND
operator - e.g. to search for tweets with keywordsjava
anddatabase
on 19 July, 2018 you need to executecurl http://DOCKER_IP:8081/tweets?keywords=java,database&op=AND&date=19-06-2018
. You should recieve a HTTP200
as a response along with the JSON payload (if there are tweets on th specified date which contain the keywords)
Scale out
To increase the number of instances tweets-consumer
(e.g. from 1 to 2), use docker-compose scale tweets-consumer=2
.
An additional container will start up and the tweet processing workload will now be distributed amongst both the instances. To make this slightly easier to confirm, you can check the logs of the tweets-consumer
service in isolation - docker-compose logs tweets-consumer
- you should see logs from the tweets-consumer_2
container (in addition to the tweets-consumer_1
) which was created as a result of the scale out process
Last updated