hadoop 输出中文乱码问题
2024-10-17 09:50:49
本文转载至:
http://www.aboutyun.com/thread-7358-1-1.html
hadoop涉及输出文本的默认输出编码统一用没有BOM的UTF-8的形式,但是对于中文的输出window系统默认的是GBK,有些格式文件例如CSV格式的文件用excel打开输出编码为没有BOM的UTF-8文件时,输出的结果为乱码,只能由UE或者记事本打开才能正常显示。因此将hadoop默认输出编码更改为GBK成为非常常见的需求。
默认的情况下MR主程序中,设定输出编码的设置语句为:
- job.setOutputFormatClass(TextOutputFormat.class);
复制代码
- TextOutputFormat.class
复制代码
的代码如下:
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapreduce.lib.output;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.classification.InterfaceStability;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.GzipCodec;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.util.*;
- /** An {@link OutputFormat} that writes plain text files. */
- @InterfaceAudience.Public
- @InterfaceStability.Stable
- public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
- public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
- protected static class LineRecordWriter<K, V>
- extends RecordWriter<K, V> {
- private static final String utf8 = "UTF-8"; // 将UTF-8转换成GBK
- private static final byte[] newline;
- static {
- try {
- newline = "\n".getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- protected DataOutputStream out;
- private final byte[] keyValueSeparator;
- public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
- this.out = out;
- try {
- this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- public LineRecordWriter(DataOutputStream out) {
- this(out, "\t");
- }
- /**
- * Write the object to the byte stream, handling Text as a special
- * case.
- * @param o the object to print
- * @throws IOException if the write throws, we pass it on
- */
- private void writeObject(Object o) throws IOException {
- if (o instanceof Text) {
- Text to = (Text) o; // 将此行代码注释掉
- out.write(to.getBytes(), 0, to.getLength()); // 将此行代码注释掉
- } else { // 将此行代码注释掉
- out.write(o.toString().getBytes(utf8));
- }
- }
- public synchronized void write(K key, V value)
- throws IOException {
- boolean nullKey = key == null || key instanceof NullWritable;
- boolean nullValue = value == null || value instanceof NullWritable;
- if (nullKey && nullValue) {
- return;
- }
- if (!nullKey) {
- writeObject(key);
- }
- if (!(nullKey || nullValue)) {
- out.write(keyValueSeparator);
- }
- if (!nullValue) {
- writeObject(value);
- }
- out.write(newline);
- }
- public synchronized
- void close(TaskAttemptContext context) throws IOException {
- out.close();
- }
- }
- public RecordWriter<K, V>
- getRecordWriter(TaskAttemptContext job
- ) throws IOException, InterruptedException {
- Configuration conf = job.getConfiguration();
- boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator= conf.get(SEPERATOR, "\t");
- CompressionCodec codec = null;
- String extension = "";
- if (isCompressed) {
- Class<? extends CompressionCodec> codecClass =
- getOutputCompressorClass(job, GzipCodec.class);
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
- extension = codec.getDefaultExtension();
- }
- Path file = getDefaultWorkFile(job, extension);
- FileSystem fs = file.getFileSystem(conf);
- if (!isCompressed) {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
- } else {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(new DataOutputStream
- (codec.createOutputStream(fileOut)),
- keyValueSeparator);
- }
- }
- }
复制代码
从上述代码的第48行可以看出hadoop已经限定此输出格式统一为UTF-8,因此为了改变hadoop的输出代码的文本编码只需定义一个和TextOutputFormat相同的类GbkOutputFormat同样继承FileOutputFormat(注意是org.apache.hadoop.mapreduce.lib.output.FileOutputFormat)即可,如下代码:
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import org.apache.hadoop.classification.InterfaceAudience;
- import org.apache.hadoop.classification.InterfaceStability;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.GzipCodec;
- import org.apache.hadoop.mapreduce.OutputFormat;
- import org.apache.hadoop.mapreduce.RecordWriter;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.util.*;
- @InterfaceAudience.Public
- @InterfaceStability.Stable
- public class GbkOutputFormat<K, V> extends FileOutputFormat<K, V> {
- public static String SEPERATOR = "mapreduce.output.textoutputformat.separator";
- protected static class LineRecordWriter<K, V>
- extends RecordWriter<K, V> {
- private static final String utf8 = "GBK";
- private static final byte[] newline;
- static {
- try {
- newline = "\n".getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- protected DataOutputStream out;
- private final byte[] keyValueSeparator;
- public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
- this.out = out;
- try {
- this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
- } catch (UnsupportedEncodingException uee) {
- throw new IllegalArgumentException("can't find " + utf8 + " encoding");
- }
- }
- public LineRecordWriter(DataOutputStream out) {
- this(out, "\t");
- }
- /**
- * Write the object to the byte stream, handling Text as a special
- * case.
- * @param o the object to print
- * @throws IOException if the write throws, we pass it on
- */
- private void writeObject(Object o) throws IOException {
- if (o instanceof Text) {
- // Text to = (Text) o;
- // out.write(to.getBytes(), 0, to.getLength());
- // } else {
- out.write(o.toString().getBytes(utf8));
- }
- }
- public synchronized void write(K key, V value)
- throws IOException {
- boolean nullKey = key == null || key instanceof NullWritable;
- boolean nullValue = value == null || value instanceof NullWritable;
- if (nullKey && nullValue) {
- return;
- }
- if (!nullKey) {
- writeObject(key);
- }
- if (!(nullKey || nullValue)) {
- out.write(keyValueSeparator);
- }
- if (!nullValue) {
- writeObject(value);
- }
- out.write(newline);
- }
- public synchronized
- void close(TaskAttemptContext context) throws IOException {
- out.close();
- }
- }
- public RecordWriter<K, V>
- getRecordWriter(TaskAttemptContext job
- ) throws IOException, InterruptedException {
- Configuration conf = job.getConfiguration();
- boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator= conf.get(SEPERATOR, "\t");
- CompressionCodec codec = null;
- String extension = "";
- if (isCompressed) {
- Class<? extends CompressionCodec> codecClass =
- getOutputCompressorClass(job, GzipCodec.class);
- codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
- extension = codec.getDefaultExtension();
- }
- Path file = getDefaultWorkFile(job, extension);
- FileSystem fs = file.getFileSystem(conf);
- if (!isCompressed) {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
- } else {
- FSDataOutputStream fileOut = fs.create(file, false);
- return new LineRecordWriter<K, V>(new DataOutputStream
- (codec.createOutputStream(fileOut)),
- keyValueSeparator);
- }
- }
- }
复制代码
最后将输出编码类型设置成GbkOutputFormat.class,如:
- job.setOutputFormatClass(GbkOutputFormat.class);
复制代码
参考:
- http://semantic.iteye.com/blog/1846238
复制代码
最新文章
- Caring for our seniors
- php综合应用
- 2015CCPC小记
- 简单的poi导出excel文件
- js json与字符串转换
- Asp.net mvc与PHP的Session共享的实现
- VCC、VDD、VEE、VSS等有关电源标注的区别
- J2EE、J2SE、J2ME
- canvas-画图改进版
- Day9 - Python 多线程、进程
- Twitter 新一代流处理利器——Heron 论文笔记之Heron架构
- Kafka项目实践
- spring+springmvc+maven+mybatis整合
- MAC图片格式转换
- Swift の 函数式编程
- 【面向对象设计原则】之依赖倒置原则(DIP)
- zoj2277 The Gate to Freedom
- 用PHPMailer在本地win环境,可以接收到邮件和附件,但在linux环境只能接收邮件信息接不到附件,是我的路
- spring之构造注入
- python升级pip和Django安装