You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Christian Blunden afd109dcac migrate to Kafka version 0.8.1. Fix changes in api. Tests pass 8 years ago
autodoc@b818f4e6bc updated autodoc 10 years ago
dev-resources change log4j to append to ./logs/kafka.log; tidy the zk consumer test 9 years ago
src/clj_kafka Remove no longer used pipe implementation 8 years ago
test/clj_kafka/test migrate to Kafka version 0.8.1. Fix changes in api. Tests pass 8 years ago
.gitignore Add nrepl-port to gitignore 8 years ago
.travis.yml update travis-ci to run only expectations 9 years ago
README.md Remove queue from zk consumer 8 years ago
project.clj migrate to Kafka version 0.8.1. Fix changes in api. Tests pass 8 years ago

README.md

clj-kafka

Clojure library for Kafka.

Current build status: Build Status

Development is against the 0.8 release of Kafka. The protocols for 0.7 and 0.8 are incompatible so this will only work when connecting to a 0.8 cluster. Earlier releases of clj-kafka support the earlier 0.7 release if you need it.

Installing

Add the following to your Leiningen project.clj:

[clj-kafka "0.1.2-0.8"]

Usage

Producer

Discovery of Kafka brokers from Zookeeper:

(brokers {"zookeeper.connect" "127.0.0.1:2181"})
;; ({:host "localhost", :jmx_port -1, :port 9999, :version 1})
(use 'clj-kafka.producer)

(def p (producer {"metadata.broker.list" "localhost:9999"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(send-message p (message "test" (.getBytes "this is my message")))

Zookeeper Consumer

The Zookeeper consumer uses broker information contained within Zookeeper to consume messages. This consumer also allows the client to automatically commit consumed offsets so they're not retrieved again.

(use 'clj-kafka.consumer.zk)
(use 'clj-kafka.core)

(def config {"zookeeper.connect" "localhost:2182"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(with-resource [c (consumer config)]
  shutdown
  (take 2 (messages c "test")))

License

Copyright © 2013 Paul Ingles

Distributed under the Eclipse Public License, the same as Clojure.