Wednesday, 13 January 2021

Kafka:- Twitter_Client - Tracing message from twitter for a list of keywords in message

 Below is the programme where we can get info from twitter tweets for a specific keyword (like in my case i used it for keyword "marklogicgd" & "Java")

package com.github.vishnu.kafka.twitter1;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

String consumerKey = "8uFw2Cs5kAUcSJJu9nSnbRR9N";
String consumerSecret = "d9M5pBI1lGyh7cuAhDN6TaiHCh0dDku1D65q1UkAjV6u8rSEPh";
String token = "156555034-q959i15aLILF8AgORmDWzL0SWMQXeMs05xL85COH";
String secret = "OepA9mu5Xb54ImkLtCmQYQti3onHqOs7bcWhFfX2efkb2";

List<String> terms = Lists.newArrayList("marklogicgd","java","censhare");
//we can add multiple terms as comma separated.


public TwitterProducer(){}

public static void main(String[] args) {
//System.out.println("Hello");

new TwitterProducer().run();




}

public void run(){
logger.info("Setup");
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);


//create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();

//create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();

//add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer ...");
producer.close();
logger.info("Done!");
}

));

//loop to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if(msg != null){
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
logger.error("Something bad happened", e);
}

}
});
}
}
logger.info("End of application");
}

public Client createTwitterClient(BlockingQueue<String> msgQueue){
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
//following people
//List<Long> followings = Lists.newArrayList(1234L, 566788L);
//tracking terms
//List<String> terms = Lists.newArrayList("twitter", "api");
//List<String> terms = Lists.newArrayList("COVID-19");
//hosebirdEndpoint.followings(followings);
hosebirdEndpoint.trackTerms(terms);

// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

//creating client
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
//.eventMessageQueue(eventQueue); // optional: use this if you want to process client events

Client hosebirdClient = builder.build();
return hosebirdClient;


}

public KafkaProducer<String, String> createKafkaProducer(){
// System.out.println("Hello Kafka World");
String bootstrapServer = "127.0.01:9092";
//create producer property
Properties properties = new Properties();
//properties.setProperty("bootstrap.servers",bootstrapServer);
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.setProperty("value.serializer", StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;

}
}

No comments: