在MaxCompute中利用bitmap进行数据处理
2024-10-08 01:16:21
很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;
public class bitmapDemo2
{
public static class BitMapper extends MapperBase {
Record key;
Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
}
@Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException
{
RoaringBitmap mrb=new RoaringBitmap();
long AID=0;
{
{
{
{
AID=record.getBigint("id");
mrb.add((int) AID);
//获取key
key.set(new Object[] {record.getString("active_date")});
}
}
}
}
ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
mrb.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
value.set(new Object[] {serializedstring});
context.write(key, value);
}
}
public static class BitReducer extends ReducerBase {
private Record result = null;
public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
}
public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long fcount = 0;
RoaringBitmap rbm=new RoaringBitmap();
while (values.hasNext())
{
Record val = values.next();
ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
RoaringBitmap p= new RoaringBitmap(irb);
rbm.or(p);
}
ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
rbm.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
result.set(0, key.get(0));
result.set(1, serializedstring);
context.write(result);
}
}
public static void main( String[] args ) throws OdpsException
{
System.out.println("begin.........");
JobConf job = new JobConf();
job.setMapperClass(BitMapper.class);
job.setReducerClass(BitReducer.class);
job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));
InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+
OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D
JobClient.runJob(job);
}
}
对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。
本文作者:圣远
本文为云栖社区原创内容,未经允许不得转载。
最新文章
- Maven命令行使用:mvn clean package(打包)
- scoi2010&;&;bzoj1858序列操作
- mybatis.xml文件中#与$符号的区别以及数学符号的处理
- 两种流行Spring定时器配置:Java的Timer类和OpenSymphony的Quartz
- c#基础系列(转)
- sql替换指定字段指定字符串
- NSIS打包(一)常用概念简介
- DBA_Oracle Table Partition表分区概念汇总(概念)
- hdu 5025 Saving Tang Monk(bfs+状态压缩)
- LeetCode15 3Sum
- 在Visual Studio 2010中使用DSL Tool特定领域开发 开篇
- Java [leetcode 33]Search in Rotated Sorted Array
- 【java】Java组件概览(2)— 基本库
- duilib
- Ubuntu 12.04上安装 MongoDB并运行
- Android开发 ---基本UI组件7 :分页功能、适配器、滚动条监听事件
- ASP.NET Core的身份认证框架IdentityServer4--(5)自定义用户登录(使用官网提供的UI)
- 存储过程简单Demo
- 分类预测输出precision,recall,accuracy,auc和tp,tn,fp,fn矩阵
- HTML 5 audio标签