Skip to content

Commit

Permalink
modify src-plugins/logpipe-output-hdfs.c but not test
Browse files Browse the repository at this point in the history
  • Loading branch information
calvin williams committed Dec 26, 2017
1 parent c623290 commit 27a9a6d
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 26 deletions.
2 changes: 1 addition & 1 deletion conf/logpipe_case3_output_hdfs.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@

"outputs" :
[
{ "plugin":"so/logpipe-output-hdfs.so" , "name_node":"192.168.6.21" , "port":9866 , "path":"/log" }
{ "plugin":"so/logpipe-output-hdfs.so" , "name_node":"192.168.6.21" , "port":9000 , "user":"hdfs" , "path":"/log/%Y%M%D" }
]
}
142 changes: 124 additions & 18 deletions src-plugins/logpipe-output-hdfs.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@
make logpipe-output-hdfs.so && cp logpipe-output-hdfs.so ~/so/
*/

/* add to ~/.profile
# for hadoop
export HADOOP_HOME=/home/hdfs/expack/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
export HADOOP_CLASSPATH=`hadoop classpath --glob`
export CLASSPATH=$HADOOP_CLASSPATH:$CLASSPATH
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64/server:$LD_LIBRARY_PATH
*/

/* add to $HADOOP/etc/hadoop/hdfs-site.xml
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
*/

char *__LOGPIPE_OUTPUT_FILE_VERSION = "0.1.0" ;

struct OutputPluginContext
Expand Down Expand Up @@ -111,7 +128,6 @@ int InitOutputPluginContext( struct LogpipeEnv *p_env , struct LogpipeOutputPlug
else
{
INFOLOG( "hdfsConnectAsUser ok , name_node[%s] port[%d] user[%s]" , p_plugin_ctx->name_node , p_plugin_ctx->port , p_plugin_ctx->user )
return -1;
}
}
else
Expand All @@ -126,7 +142,6 @@ int InitOutputPluginContext( struct LogpipeEnv *p_env , struct LogpipeOutputPlug
else
{
INFOLOG( "hdfsConnect ok , name_node[%s] port[%d]" , p_plugin_ctx->name_node , p_plugin_ctx->port )
return -1;
}
}
}
Expand All @@ -140,59 +155,150 @@ int OnOutputPluginEvent( struct LogpipeEnv *p_env , struct LogpipeOutputPlugin *
return 0;
}

static int ReMkdirPath( hdfsFS fs , char *path )
{
hdfsFileInfo *file_info = NULL ;
char up_path[ PATH_MAX + 1 ] ;
char *p = NULL ;

int nret = 0 ;

if( path[0] == '\0' )
return 0;

file_info = hdfsGetPathInfo( fs , path ) ;
if( file_info )
{
DEBUGLOG( "hdfs path[%s] exist" , path )
if( file_info->mKind != kObjectKindDirectory )
{
ERRORLOG( "hdfs path[%s] is't a directory" , path )
hdfsFreeFileInfo( file_info , 1 );
return -1;
}
hdfsFreeFileInfo( file_info , 1 );

return 0;
}
else
{
WARNLOG( "hdfs path[%s] not exist" , path )
}

strcpy( up_path , path );
p = strrchr( up_path , '/' ) ;
if( p == NULL )
{
ERRORLOG( "hdfs path[%s] invalid" , up_path )
return -2;
}
(*p) = '\0' ;
nret = ReMkdirPath( fs , up_path ) ;
if( nret )
return nret;

nret = hdfsCreateDirectory( fs , path ) ;
if( nret )
{
ERRORLOG( "mkdir hdfs path[%s] failed[%d]" , path , nret )
return nret;
}
else
{
DEBUGLOG( "mkdir hdfs path[%s] ok" , path )
}

return 0;
}

funcBeforeWriteOutputPlugin BeforeWriteOutputPlugin ;
int BeforeWriteOutputPlugin( struct LogpipeEnv *p_env , struct LogpipeOutputPlugin *p_logpipe_output_plugin , void *p_context , uint16_t filename_len , char *filename )
{
struct OutputPluginContext *p_plugin_ctx = (struct OutputPluginContext *)p_context ;

char path_expand[ PATH_MAX + 1 ] ;
hdfsFileInfo *file_info = NULL ;

int nret = 0 ;

p_plugin_ctx->filename_len = filename_len ;
p_plugin_ctx->filename = filename ;

memset( path_expand , 0x00 , sizeof(path_expand) );
strncpy( path_expand , p_plugin_ctx->path , sizeof(path_expand)-1 );
nret = ExpandStringBuffer( path_expand , sizeof(path_expand) ) ;
if( nret )
{
ERRORLOG( "ExpandStringBuffer[%s] failed[%d]" , p_plugin_ctx->path , nret )
return 1;
}
else
{
DEBUGLOG( "ExpandStringBuffer[%s] ok , path_expand[%s]" , p_plugin_ctx->path , path_expand )
}
nret = ReMkdirPath( p_plugin_ctx->fs , path_expand );
if( nret )
{
ERRORLOG( "ReMkdirPath[%s] failed[%d]" , path_expand , nret )
}
else
{
DEBUGLOG( "ReMkdirPath[%s] ok" , path_expand )
}

memset( p_plugin_ctx->path_filename , 0x00 , sizeof(p_plugin_ctx->path_filename) );
snprintf( p_plugin_ctx->path_filename , sizeof(p_plugin_ctx->path_filename)-1 , "%s/%.*s" , p_plugin_ctx->path , filename_len , filename );
snprintf( p_plugin_ctx->path_filename , sizeof(p_plugin_ctx->path_filename)-1 , "%s/%.*s" , path_expand , filename_len , filename );

file_info = hdfsGetPathInfo( p_plugin_ctx->fs , p_plugin_ctx->path_filename ) ;
if( file_info == NULL )
{
ERRORLOG( "hdfsGetPathInfo[%s] failed" , p_plugin_ctx->path_filename )
return 1;
DEBUGLOG( "file[%s] not exist" , p_plugin_ctx->path_filename )
}
else
{
DEBUGLOG( "hdfsGetPathInfo[%s] ok" , p_plugin_ctx->path_filename )
DEBUGLOG( "file[%s] exist" , p_plugin_ctx->path_filename )
}

p_plugin_ctx->file = hdfsOpenFile( p_plugin_ctx->fs , p_plugin_ctx->path_filename , O_CREAT|O_WRONLY , 0 , 0 , 0 ) ;
// p_plugin_ctx->file = hdfsOpenFile( p_plugin_ctx->fs , p_plugin_ctx->path_filename , O_WRONLY|O_APPEND , 0 , 0 , 0 ) ;
// p_plugin_ctx->file = hdfsOpenFile( p_plugin_ctx->fs , p_plugin_ctx->path_filename , O_CREAT|O_APPEND , 0 , 0 , 0 ) ;
if( p_plugin_ctx->file == NULL )
{
ERRORLOG( "hdfsOpenFile[%s] failed" , p_plugin_ctx->path_filename )
free( file_info );
ERRORLOG( "hdfsOpenFile[%s] failed , errno[%d]" , p_plugin_ctx->path_filename , errno )
if( file_info )
{
hdfsFreeFileInfo( file_info , 1 );
}
return 1;
}
else
{
DEBUGLOG( "hdfsOpenFile[%s] ok" , p_plugin_ctx->path_filename )
}

nret = hdfsSeek( p_plugin_ctx->fs , p_plugin_ctx->file , file_info->mSize ) ;
if( nret == -1 )
/*
if( file_info && file_info->mSize > 0 )
{
ERRORLOG( "hdfsSeek[%s][%d] failed" , p_plugin_ctx->path_filename , file_info->mSize )
hdfsCloseFile( p_plugin_ctx->fs , p_plugin_ctx->file ); p_plugin_ctx->file = NULL ;
free( file_info );
return 1;
nret = hdfsSeek( p_plugin_ctx->fs , p_plugin_ctx->file , file_info->mSize ) ;
if( nret == -1 )
{
ERRORLOG( "hdfsSeek[%s][%d] failed" , p_plugin_ctx->path_filename , file_info->mSize )
hdfsCloseFile( p_plugin_ctx->fs , p_plugin_ctx->file ); p_plugin_ctx->file = NULL ;
hdfsFreeFileInfo( file_info , 1 );
return 1;
}
else
{
DEBUGLOG( "hdfsSeek[%s][%d] ok" , p_plugin_ctx->path_filename , file_info->mSize )
}
}
else
*/

if( file_info )
{
DEBUGLOG( "hdfsSeek[%s][%d] ok" , p_plugin_ctx->path_filename , file_info->mSize )
hdfsFreeFileInfo( file_info , 1 );
}

free( file_info );

return 0;
}

Expand Down
6 changes: 3 additions & 3 deletions src-plugins/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ LFLAGS = $(_LFLAGS) \
-rdynamic \

###### 额外宏定义区
CLEAN_ADDITION =
CLEAN_ADDITION = logpipe-output-hdfs.so \

###### 加载mktpl模板库
#@ make_all
Expand Down Expand Up @@ -67,12 +67,12 @@ logpipe-output-tcp.so : logpipe-output-tcp.o
logpipe-input-exec.so : logpipe-input-exec.o
$(CC) -o $@ logpipe-input-exec.o $(SOFLAGS) $(LFLAGS)

CFLAGS_hdfs = $(CFLAGS) -I/home/hdfs/hadoop/include
CFLAGS_hdfs = $(CFLAGS) -I/home/hdfs/expack/hadoop/include

logpipe-output-hdfs.o : logpipe-output-hdfs.c
$(CC) $(CFLAGS_hdfs) -c logpipe-output-hdfs.c

LFLAGS_hdfs = $(LFLAGS) -L/home/hdfs/hadoop/lib/native -lhdfs -L$(HOME)/expack/jdk1.8.0_152/jre/lib/amd64/server -ljvm
LFLAGS_hdfs = $(LFLAGS) -L/home/hdfs/expack/hadoop/lib/native -lhdfs -L$(HOME)/expack/jdk1.8.0_152/jre/lib/amd64/server -ljvm

logpipe-output-hdfs.so : logpipe-output-hdfs.o
$(CC) -o $@ logpipe-output-hdfs.o $(SOFLAGS) $(LFLAGS_hdfs)
Expand Down
13 changes: 12 additions & 1 deletion src-plugins/makefile.Linux
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ c_FILE = \
logpipe-input-tcp.c \
logpipe-output-tcp.c \
logpipe-input-exec.c \
logpipe-output-hdfs.c \

###### Ŀ���ļ�����װĿ¼������
include makeinstall
Expand All @@ -59,7 +60,7 @@ LFLAGS = $(_LFLAGS) \
-rdynamic \

###### ����궨����
CLEAN_ADDITION =
CLEAN_ADDITION = logpipe-output-hdfs.so \

###### ����mktplģ���
# ����Ŀ���б�
Expand Down Expand Up @@ -204,3 +205,13 @@ logpipe-output-tcp.so : logpipe-output-tcp.o
logpipe-input-exec.so : logpipe-input-exec.o
$(CC) -o $@ logpipe-input-exec.o $(SOFLAGS) $(LFLAGS)

CFLAGS_hdfs = $(CFLAGS) -I/home/hdfs/hadoop/include

logpipe-output-hdfs.o : logpipe-output-hdfs.c
$(CC) $(CFLAGS_hdfs) -c logpipe-output-hdfs.c

LFLAGS_hdfs = $(LFLAGS) -L/home/hdfs/hadoop/lib/native -lhdfs -L$(HOME)/expack/jdk1.8.0_152/jre/lib/amd64/server -ljvm

logpipe-output-hdfs.so : logpipe-output-hdfs.o
$(CC) -o $@ logpipe-output-hdfs.o $(SOFLAGS) $(LFLAGS_hdfs)

3 changes: 3 additions & 0 deletions src/logpipe_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ int CompressInputPluginData( char *compress_algorithm , char *block_in_buf , uin
/* 解密输出插件读取的块数据 */
int UncompressInputPluginData( char *uncompress_algorithm , char *block_in_buf , uint32_t block_in_len , char *block_out_buf , uint32_t *p_block_out_len );

/* 字符串展开 */
int ExpandStringBuffer( char *base , int buf_size );

#ifdef __cplusplus
extern }
#endif
Expand Down
6 changes: 3 additions & 3 deletions src/output.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ int WriteAllOutputPlugins( struct LogpipeEnv *p_env , struct LogpipeInputPlugin
if( nret < 0 )
{
ERRORLOG( "[%s]->pfuncBeforeWriteOutputPlugin failed , errno[%d]" , p_logpipe_output_plugin->so_filename , errno )
return -1;
return 1;
}
else if( nret > 0 )
{
Expand Down Expand Up @@ -57,7 +57,7 @@ int WriteAllOutputPlugins( struct LogpipeEnv *p_env , struct LogpipeInputPlugin
else if( nret < 0 )
{
ERRORLOG( "[%s]->pfuncReadInputPlugin failed[%d]" , p_logpipe_input_plugin->so_filename , nret )
return -1;
return 1;
}
else if( nret > 0 )
{
Expand Down Expand Up @@ -109,7 +109,7 @@ int WriteAllOutputPlugins( struct LogpipeEnv *p_env , struct LogpipeInputPlugin
if( nret < 0 )
{
ERRORLOG( "[%s]->pfuncAfterWriteOutputPlugin failed[%d]" , p_logpipe_output_plugin->so_filename , nret )
return -1;
return 1;
}
else if( nret > 0 )
{
Expand Down
Loading

0 comments on commit 27a9a6d

Please sign in to comment.