Skip to content

Commit

Permalink
produce message with http
Browse files Browse the repository at this point in the history
  • Loading branch information
adyliu committed Nov 15, 2014
1 parent 0d62077 commit 4319f43
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 20 deletions.
95 changes: 95 additions & 0 deletions src/main/java/com/sohu/jafka/http/HttpRequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.sohu.jafka.http;

import com.sohu.jafka.api.ProducerRequest;
import com.sohu.jafka.api.RequestKeys;
import com.sohu.jafka.log.ILog;
import com.sohu.jafka.log.LogManager;
import com.sohu.jafka.message.ByteBufferMessageSet;
import com.sohu.jafka.message.CompressionCodec;
import com.sohu.jafka.message.Message;
import com.sohu.jafka.message.MessageAndOffset;
import com.sohu.jafka.mx.BrokerTopicStat;
import com.sohu.jafka.network.Receive;
import com.sohu.jafka.network.Send;
import com.sohu.jafka.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

import static java.lang.String.format;
import static com.sohu.jafka.utils.Utils.*;
/**
* @author adyliu ([email protected])
* @since 2014-11-14
*/
public class HttpRequestHandler {
protected final Logger logger = LoggerFactory.getLogger(getClass());
final String errorFormat = "Error processing %s on %s:%d";

final LogManager logManager;
public HttpRequestHandler(LogManager logManager){
this.logManager = logManager;
}
public void handle(Map<String,String> args,byte[] data){
RequestKeys requestKey = RequestKeys.valueOf(args.get("request_key"));
ByteBufferMessageSet messageSet = new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec,new Message(data));
final String topic = args.get("topic");
final int partition = getIntInRange(args, "partition", 0, 0, 1024);
switch (requestKey){
case PRODUCE:
produce(topic,partition,messageSet);
break;
default:
break;
}
}

private void produce(String topic,int partition, ByteBufferMessageSet messageSet) {
final long st = System.currentTimeMillis();
ProducerRequest request = new ProducerRequest(topic,partition,messageSet);
if (logger.isDebugEnabled()) {
logger.debug("Producer request " + request.toString());
}
handleProducerRequest(request);
long et = System.currentTimeMillis();
if (logger.isDebugEnabled()) {
logger.debug("produce a message(set) cost " + (et - st) + " ms");
}
}

protected void handleProducerRequest(ProducerRequest request) {
int partition = request.getTranslatedPartition(logManager);
try {
final ILog log = logManager.getOrCreateLog(request.topic, partition);
log.append(request.messages);
long messageSize = request.messages.getSizeInBytes();
if (logger.isDebugEnabled()) {
logger.debug(messageSize + " bytes written to logs " + log);
for (MessageAndOffset m : request.messages) {
logger.trace("wrote message " + m.offset + " to disk");
}
}
BrokerTopicStat.getInstance(request.topic).recordBytesIn(messageSize);
BrokerTopicStat.getBrokerAllTopicStat().recordBytesIn(messageSize);
} catch (RuntimeException e) {
if (logger.isDebugEnabled()) {
logger.error(format(errorFormat, request.getRequestKey(), request.topic, request.partition), e);
} else {
logger.error("Producer failed. " + e.getMessage());
}
BrokerTopicStat.getInstance(request.topic).recordFailedProduceRequest();
BrokerTopicStat.getBrokerAllTopicStat().recordFailedProduceRequest();
throw e;
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.error(format(errorFormat, request.getRequestKey(), request.topic, request.partition), e);
} else {
logger.error("Producer failed. " + e.getMessage());
}
BrokerTopicStat.getInstance(request.topic).recordFailedProduceRequest();
BrokerTopicStat.getBrokerAllTopicStat().recordFailedProduceRequest();
throw new RuntimeException(e.getMessage(), e);
}
}
}
48 changes: 35 additions & 13 deletions src/main/java/com/sohu/jafka/http/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,73 @@
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;

/**
* An HTTP server that sends back the content of the received HTTP request
* in a pretty plaintext form.
*/
public class HttpServer {
public class HttpServer extends Thread implements Closeable{

private final int port;

public HttpServer(int port) {
final EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workerGroup = new NioEventLoopGroup(10);
final HttpRequestHandler handler;
final org.slf4j.Logger logger = LoggerFactory.getLogger(getClass());
//
public HttpServer(int port,HttpRequestHandler handler) {
super("jafka-httpserver");
this.port = port;
this.handler = handler;
}

public void run() throws Exception {
public void run() {
// Configure the server.
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
//
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.option(ChannelOption.SO_BACKLOG, 1024);
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitializer());
.childHandler(new HttpServerInitializer(this));

Channel ch = b.bind(port).sync().channel();
System.err.println("Open your web browser and navigate to " + "http://127.0.0.1:" + port + '/');
//System.err.println("Open your web browser and navigate to " + "http://127.0.0.1:" + port + '/');
logger.info("Jafka HttpServer start at port {}",port);
ch.closeFuture().sync();
} finally {
} catch (InterruptedException ie){
Thread.currentThread().interrupt();
throw new RuntimeException(ie.getMessage(),ie);
}
finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
logger.warn("Jafka HttpServer run over");
}

@Override
public void close() throws IOException {
logger.info("Jafka HttpServer stop port {}",port);
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

public static void main(String[] args) throws Exception {
BasicConfigurator.configure();
Logger.getRootLogger().setLevel(Level.INFO);
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8000;
port = 9093;
}
System.out.println("start server");
new HttpServer(port).run();
new HttpServer(port,null).run();
System.out.println("server stop");
}
}
29 changes: 25 additions & 4 deletions src/main/java/com/sohu/jafka/http/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.sohu.jafka.http;

import com.sohu.jafka.api.RequestKeys;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -38,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -51,10 +54,16 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<Object> {

private Logger logger = LoggerFactory.getLogger(getClass());
private HttpRequest request;
final HttpServer server;
public HttpServerHandler(HttpServer server){
this.server = server;
}
/**
* Buffer that stores the response content
*/
private final StringBuilder body = new StringBuilder();
//private final StringBuilder body = new StringBuilder();
private ByteArrayOutputStream body;
private Map<String,String> args = null;

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
Expand All @@ -69,7 +78,8 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx);
}
body.setLength(0);
body = new ByteArrayOutputStream(64);
args = new HashMap<String, String>(4);
//
if (request.getMethod() != HttpMethod.POST) {
sendStatusMessage(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED, "POST METHOD REQUIRED");
Expand All @@ -78,21 +88,32 @@ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Except
HttpHeaders headers = request.headers();
String contentType = headers.get("Content-Type");
// 处理 text or octstream
args.put("request_key",headers.get("request_key"));
args.put("topic",headers.get("topic"));
args.put("partition",headers.get("partition"));
}

if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;

ByteBuf content = httpContent.content();
if (content.isReadable()) {
body.append(content.toString(CharsetUtil.UTF_8));
//body.write(content.array());
content.readBytes(body,content.readableBytes());
//body.append(content.toString(CharsetUtil.UTF_8));
}

if (msg instanceof LastHttpContent) {
//process request
if(server.handler != null) {
server.handler.handle(args, body.toByteArray());
}
if (!writeResponse(ctx)) {
// If keep-alive is off, close the connection once the content is fully written.
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
body = null;
args = null;
}
}
}
Expand All @@ -104,7 +125,7 @@ private boolean writeResponse(ChannelHandlerContext ctx) {
// Build the response object.
FullHttpResponse response = new DefaultFullHttpResponse(
HTTP_1_1, OK,
Unpooled.copiedBuffer(body.toString(), CharsetUtil.UTF_8));
Unpooled.copiedBuffer("OK", CharsetUtil.UTF_8));

response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");

Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/sohu/jafka/http/HttpServerInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import io.netty.handler.codec.http.HttpServerCodec;

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
final HttpServer server;
public HttpServerInitializer(HttpServer server){
this.server = server;
}
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
Expand All @@ -32,6 +36,6 @@ public void initChannel(SocketChannel ch) throws Exception {
p.addLast(new HttpResponseEncoder());
// Remove the following line if you don't want automatic content compression.
//p.addLast(new HttpContentCompressor());
p.addLast(new HttpServerHandler());
p.addLast(new HttpServerHandler(server));
}
}
16 changes: 15 additions & 1 deletion src/main/java/com/sohu/jafka/server/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.sohu.jafka.server;

import com.sohu.jafka.http.HttpRequestHandler;
import com.sohu.jafka.http.HttpServer;
import com.sohu.jafka.log.LogManager;
import com.sohu.jafka.mx.ServerInfo;
import com.sohu.jafka.mx.SocketServerStats;
Expand Down Expand Up @@ -54,7 +56,8 @@ public class Server implements Closeable {

final AtomicBoolean isShuttingDown = new AtomicBoolean(false);

SocketServer socketServer;
private SocketServer socketServer;
private HttpServer httpServer;

private final File logDir;

Expand Down Expand Up @@ -92,6 +95,14 @@ public void startup() {
socketServer = new SocketServer(handlers, config);
Utils.registerMBean(socketServer.getStats());
socketServer.startup();
//
final int httpPort = config.getHttpPort();
if(httpPort>0){
HttpRequestHandler httpRequestHandler = new HttpRequestHandler(logManager);
httpServer = new HttpServer(httpPort,httpRequestHandler);
httpServer.start();
}

Mx4jLoader.maybeLoad();
/**
* Registers this broker in ZK. After this, consumers can connect to broker. So
Expand Down Expand Up @@ -120,6 +131,9 @@ public void close() {
socketServer.close();
Utils.unregisterMBean(socketServer.getStats());
}
if(httpServer != null){
httpServer.close();
}
if (logManager != null) {
logManager.close();
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/sohu/jafka/server/ServerConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ServerConfig(Properties props) {
public int getPort() {
return getInt(props, "port", 9092);
}
public int getHttpPort(){ return getInt(props,"http.port",0);}

/**
* hostname of broker. If not set, will pick up from the value returned
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/com/sohu/jafka/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ public static int getIntInRange(Properties props, String name, int defaultValue,
}
throw new IllegalArgumentException(name + " has value " + v + " which is not in the range");
}
public static int getIntInRange(Map<String,String> props, String name, int defaultValue, int min, int max) {
int v = defaultValue;
if (props.containsKey(name)) {
v = Integer.valueOf(props.get(name));
}
if (v >= min && v <= max) {
return v;
}
throw new IllegalArgumentException(name + " has value " + v + " which is not in the range");
}

public static boolean getBoolean(Properties props, String name, boolean defaultValue) {
if (!props.containsKey(name)) return defaultValue;
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/com/sohu/jafka/producer/ProducerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public class ProducerTest extends BaseJafkaServer {
*/
@Test
public void testSend() {
Jafka jafka = createJafka();
Properties mainProperties = new Properties();
mainProperties.put("http.port","9093");
Jafka jafka = createJafka(mainProperties);
Properties producerConfig = new Properties();
producerConfig.setProperty("broker.list", "0:localhost:"+jafka.getPort());
producerConfig.setProperty("serializer.class", StringEncoder.class.getName());
Expand Down

0 comments on commit 4319f43

Please sign in to comment.