Wednesday, 13 January 2021

Kafka:- Twitter_Client - Tracing message from twitter for a list of keywords in message

 Below is the programme where we can get info from twitter tweets for a specific keyword (like in my case i used it for keyword "marklogicgd" & "Java")

package com.github.vishnu.kafka.twitter1;

import com.google.common.collect.Lists;
import com.twitter.hbc.ClientBuilder;
import com.twitter.hbc.core.Client;
import com.twitter.hbc.core.Constants;
import com.twitter.hbc.core.Hosts;
import com.twitter.hbc.core.HttpHosts;
import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
import com.twitter.hbc.core.processor.StringDelimitedProcessor;
import com.twitter.hbc.httpclient.auth.Authentication;
import com.twitter.hbc.httpclient.auth.OAuth1;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class TwitterProducer {
Logger logger = LoggerFactory.getLogger(TwitterProducer.class.getName());

String consumerKey = "8uFw2Cs5kAUcSJJu9nSnbRR9N";
String consumerSecret = "d9M5pBI1lGyh7cuAhDN6TaiHCh0dDku1D65q1UkAjV6u8rSEPh";
String token = "156555034-q959i15aLILF8AgORmDWzL0SWMQXeMs05xL85COH";
String secret = "OepA9mu5Xb54ImkLtCmQYQti3onHqOs7bcWhFfX2efkb2";

List<String> terms = Lists.newArrayList("marklogicgd","java","censhare");
//we can add multiple terms as comma separated.


public TwitterProducer(){}

public static void main(String[] args) {
//System.out.println("Hello");

new TwitterProducer().run();




}

public void run(){
logger.info("Setup");
/** Set up your blocking queues: Be sure to size these properly based on expected TPS of your stream */
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(1000);


//create a twitter client
Client client = createTwitterClient(msgQueue);
// Attempts to establish a connection.
client.connect();

//create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();

//add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping application...");
logger.info("shutting down client from twitter...");
client.stop();
logger.info("closing producer ...");
producer.close();
logger.info("Done!");
}

));

//loop to send tweets to kafka
// on a different thread, or multiple different threads....
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if(msg != null){
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null){
logger.error("Something bad happened", e);
}

}
});
}
}
logger.info("End of application");
}

public Client createTwitterClient(BlockingQueue<String> msgQueue){
/** Declare the host you want to connect to, the endpoint, and authentication (basic auth or oauth) */
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
// Optional: set up some followings and track terms
//following people
//List<Long> followings = Lists.newArrayList(1234L, 566788L);
//tracking terms
//List<String> terms = Lists.newArrayList("twitter", "api");
//List<String> terms = Lists.newArrayList("COVID-19");
//hosebirdEndpoint.followings(followings);
hosebirdEndpoint.trackTerms(terms);

// These secrets should be read from a config file
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, secret);

//creating client
ClientBuilder builder = new ClientBuilder()
.name("Hosebird-Client-01") // optional: mainly for the logs
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue));
//.eventMessageQueue(eventQueue); // optional: use this if you want to process client events

Client hosebirdClient = builder.build();
return hosebirdClient;


}

public KafkaProducer<String, String> createKafkaProducer(){
// System.out.println("Hello Kafka World");
String bootstrapServer = "127.0.01:9092";
//create producer property
Properties properties = new Properties();
//properties.setProperty("bootstrap.servers",bootstrapServer);
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.setProperty("value.serializer", StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;

}
}

Tuesday, 29 December 2020

Sleep thread in XSLT (Delay a process by specific time)

 <?xml version="1.0" encoding="UTF-8"?>

<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" xmlns:vish="http://marklogicgd.blogspot.com/xslt/vishnu" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:thread="java.lang.Thread"
    exclude-result-prefixes="xs vish thread" version="2.0">

   
<xsl:variable name="millisecondsToSleep" select="7000"/>

   
<xsl:variable name="startTime" select="current-time()"/>

   
<xsl:function name="vish:threadSleep">
       
<xsl:param name="millisecondsToSleep" as="xs:integer"/>
       
<xsl:value-of select="
               
thread:sleep(if ($millisecondsToSleep gt 0) then
                   
$millisecondsToSleep
               
else
                   
1000)"/>
   
</xsl:function>

   
<xsl:template match="/">
        Start time:
<xsl:value-of select="$startTime"/>
       
<xsl:value-of select="vish:threadSleep($millisecondsToSleep)"/>
       
<xsl:variable name="endTime" select="current-time()"/>
        End time:
<xsl:value-of select="$endTime"/>
        TimeDifference :-
<xsl:value-of select="$endTime - $startTime"/>
   
</xsl:template>

</xsl:stylesheet>



Result:-

<?xml version="1.0" encoding="UTF-8"?>
    Start time: 20:36:22.393+05:30
    End time: 20:36:29.395+05:30
    TimeDifference :- PT7.002