很多数据开发者使用bitmap技术对用户数据进行编码和压缩,然后利用bitmap的与/或/非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的7日活跃等分析。
本文给出了一个使用MaxCompute MapReduce开发一个对不同日期活跃用户ID进行bitmap编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。


import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator; public class bitmapDemo2
{ public static class BitMapper extends MapperBase { Record key;
Record value;
@Override
public void setup(TaskContext context) throws IOException {
key = context.createMapOutputKeyRecord();
value = context.createMapOutputValueRecord();
} @Override
public void map(long recordNum, Record record, TaskContext context)
throws IOException
{
RoaringBitmap mrb=new RoaringBitmap();
long AID=0;
{
{
{
{
AID=record.getBigint("id");
mrb.add((int) AID);
//获取key
key.set(new Object[] {record.getString("active_date")}); }
}
}
}
ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
mrb.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
value.set(new Object[] {serializedstring});
context.write(key, value);
}
} public static class BitReducer extends ReducerBase {
private Record result = null; public void setup(TaskContext context) throws IOException {
result = context.createOutputRecord();
} public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
long fcount = 0;
RoaringBitmap rbm=new RoaringBitmap();
while (values.hasNext())
{
Record val = values.next();
ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
RoaringBitmap p= new RoaringBitmap(irb);
rbm.or(p);
}
ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
rbm.serialize(new DataOutputStream(new OutputStream(){
ByteBuffer mBB;
OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
public void close() {}
public void flush() {}
public void write(int b) {
mBB.put((byte) b);}
public void write(byte[] b) {mBB.put(b);}
public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
}.init(outbb)));
String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
result.set(0, key.get(0));
result.set(1, serializedstring);
context.write(result);
}
}
public static void main( String[] args ) throws OdpsException
{ System.out.println("begin.........");
JobConf job = new JobConf(); job.setMapperClass(BitMapper.class);
job.setReducerClass(BitReducer.class); job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("id:string")); InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
// +------------+-------------+
// | id | active_date |
// +------------+-------------+
// | 1 | 20190729 |
// | 2 | 20190729 |
// | 3 | 20190730 |
// | 4 | 20190801 |
// | 5 | 20190801 |
// +------------+-------------+
OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
// +-------------+------------+
// | active_date | bit_map |
// +-------------+------------+
// 20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
// 20190730,OjAAAAEAAAAAAAAAEAAAAAMA
// 20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D JobClient.runJob(job);
}
}

对Java应用打包后,上传到MaxCompute项目中,即可在MaxCompute中调用该MR作业,对输入表的数据按日期作为key进行用户id的编码,同时按照相同日期对bitmap后的用户id取OR操作(根据需要可以取AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。

本文作者:圣远

原文链接

本文为云栖社区原创内容,未经允许不得转载。

最新文章

  1. Maven命令行使用:mvn clean package(打包)
  2. scoi2010&amp;&amp;bzoj1858序列操作
  3. mybatis.xml文件中#与$符号的区别以及数学符号的处理
  4. 两种流行Spring定时器配置:Java的Timer类和OpenSymphony的Quartz
  5. c#基础系列(转)
  6. sql替换指定字段指定字符串
  7. NSIS打包(一)常用概念简介
  8. DBA_Oracle Table Partition表分区概念汇总(概念)
  9. hdu 5025 Saving Tang Monk(bfs+状态压缩)
  10. LeetCode15 3Sum
  11. 在Visual Studio 2010中使用DSL Tool特定领域开发 开篇
  12. Java [leetcode 33]Search in Rotated Sorted Array
  13. 【java】Java组件概览(2)— 基本库
  14. duilib
  15. Ubuntu 12.04上安装 MongoDB并运行
  16. Android开发 ---基本UI组件7 :分页功能、适配器、滚动条监听事件
  17. ASP.NET Core的身份认证框架IdentityServer4--(5)自定义用户登录(使用官网提供的UI)
  18. 存储过程简单Demo
  19. 分类预测输出precision,recall,accuracy,auc和tp,tn,fp,fn矩阵
  20. HTML 5 audio标签

热门文章

  1. Java-Druid:目录
  2. 【JEECG-Boot 技术文档】新手入门教程
  3. 06_springmvc之参数绑定(pojo和集合)
  4. DOS批处理脚本
  5. 从0开始学习ssh之日志工具与配置log4j
  6. es6 Promise 异步函数调用
  7. Odoo Documentation : Fields
  8. leetcode 49 Group Anagram
  9. Django项目:CMDB(服务器硬件资产自动采集系统)--05--05CMDB采集硬件数据的插件
  10. BZOJ 2165: 大楼