Pacific-Design.com

    
Home Index

1. Apache Kafka

2. Producer

+ pom.xml

Apache Kafka / Producer /


/* ----------------------------------------------------------------------------------------------------------
 * Download: http://apache.mirrors.pair.com/kafka/0.8.2.1/kafka_2.9.1-0.8.2.1.tgz
 *
 * Check for missing class:
 * jar tvf /opt/kafka_2.9.1-0.8.2.1/libs/kafka-clients-0.8.2.1.jar | grep "org/apache/kafka/common/utils/Utils"
 *
 * Compile : javac SimpleProducer.java 
 * Run     : java SimpleProducer test 3
 *
 -------------------------------------------------------------------------------------------------------------*/

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


public class SimpleProducer {

    private static Producer producer;

    public SimpleProducer() {

        Properties props = new Properties();
        props.put("metadata.broker.list", "127.0.0.1:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);
        producer = new Producer(config);
    }

    public static void main(String[] args) {

        System.out.println("Simple Producer Main");

        int argsCount = args.length;

        if (argsCount == 0 || argsCount == 1)
            throw new IllegalArgumentException("Please provide topic name and Message count as arguments");

        String topic = (String) args[0];
        String count = (String) args[1];
        
        int messageCount = Integer.parseInt(count);
        System.out.println(" Topic Name - " + topic);
        System.out.println(" Message Count - " + messageCount);

        SimpleProducer simpleProducer = new SimpleProducer();
        simpleProducer.publishMessage(topic, messageCount);

    }

    private void publishMessage(String topic, int messageCount) {


        for (int mCount = 0; mCount < messageCount; mCount ++) {

            String runtime = new Date(). toString();
            String msg = "Message Publishing Time - " + runtime; System.out.println(msg);
            KeyedMessage < String, String > data = new KeyedMessage < String, String >(topic, msg);
            producer.send(data);
        }
        producer.close();
    }
}