hadoop 总结

1.hadoop 配置文件 core-site  hdfs-site yarn-site.xml   worker

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
                 <name>dfs.nameservices</name>
                 <value>mycluster</value>
        </property>
        <property>
                <name>dfs.ha.namenodes.mycluster</name>
                <value>nn1,nn2</value>
        </property>
         <property>
                <name>dfs.namenode.rpc-address.mycluster.nn1</name>
                <value>xiemeng-01:9870</value>
        </property>

        <property>
                <name>dfs.namenode.rpc-address.mycluster.nn2</name>
                <value>xiemeng-02:9870</value>
        </property>

        <property>
                <name>dfs.namenode.http-address.mycluster.nn1</name>
                <value>xiemeng-01:50070</value>
        </property>

        <property>
                <name>dfs.namenode.http-address.mycluster.nn2</name>
                <value>xiemeng-02:50070</value>
        </property>

        <property>
                <name>dfs.namenode.shared.edits.dir</name>
                <value>qjournal://xiemeng-01:8485;xiemeng-02:8485;xiemeng-03:8485/mycluster</value>
        </property>

        <!--配置journalnode的工作目录-->
        <property>
                <name>dfs.journalnode.edits.dir</name>
                <value>/home/xiemeng/software/hadoop-3.2.0/journalnode/data</value>
        </property>
    
        <property>
                <name>dfs.client.failover.proxy.provider.mycluster</name>
                <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
        </property>
    
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>
                        sshfence
                        shell(/bin/true)
                </value>
        </property>
    
        <property>
                <name>dfs.ha.fencing.ssh.private-key-files</name>
                <value>/home/root/.ssh/id_rsa</value>
        </property>
    
        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>
    
        <property>
                <name>dfs.webhdfs.enabled</name>
                <value>true</value>   
        </property>
 
        <property>
                <name>dfs.name.dir</name>
                <value>/home/xiemeng/software/hadoop-3.2.0/name</value>
        </property>

        <property>
                <name>dfs.data.dir</name>
                <value>/home/xiemeng/software/hadoop-3.2.0/data</value>
        </property>

        <property>
                <name>dfs.replication</name>
                <value>1</value>
        </property>

        <property>
                <name>dfs.journalnode.edits.dir</name>
                <value>/opt/journalnode/data</value>
        </property>

        <property>
                <name>dfs.ha.automatic-failover.enabled</name>
                <value>true</value>
        </property>

        <property>
                <name>dfs.ha.fencing.ssh.connect-timeout</name>
                <value>30000</value>
        </property>
</configuration>

core-site.xml

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
                <name>fs.defaultFS</name>
                <value>hdfs://mycluster</value>
        </property>
        <property>
                <name>dfs.nameservices</name>
                <value>mycluster</value>
        </property>
 
        <property>
                <name>ha.zookeeper.quorum</name>
                <value>192.168.64.128:2181,192.168.64.130:2181,192.168.64.131:2181</value>
        </property>
        <property>
                <name>hadoop.tmp.dir</name>
                <value>/home/xiemeng/software/hadoop-3.2.0/var</value>
        </property>

         <property>
                <name>io.file.buffer.size</name>
                <value>131072</value>
         </property>

         <property>
                <name>ipc.client.connect.max.retries</name>
                <value>100</value>
                <description>Indicates the number of retries a client will make to establisha server connection.</description>
        </property>
 
        <property>
                <name>ipc.client.connect.retry.interval</name>
                <value>10000</value>
                <description>Indicates the number of milliseconds a client will wait forbefore retrying to establish a server connec
tion.
                </description>
        </property>

        <property>
                <name>hadoop.proxyuser.xiemeng.hosts</name>
                <value>*</value>
        </property>
        <property>
                <name>hadoop.proxyuser.xiemeng.groups</name>
                <value>*</value>
        </property>


         <property>
                <name>hadoop.native.lib</name>
                <value>true</value>
                <description>Should native hadoop libraries, if present, be used.</description>
        </property>
 
        <property>
                <name>fs.trash.interval</name>
                <value>1</value>
        </property>

        <property>
                <name>fs.trash.checkpoint.interval</name>
                <value>1</value>
        </property>
</configuration>

yarn-site.xml

<configuration>
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>

  <property>
     <name>yarn.resourcemanager.ha.enabled</name>
     <value>true</value>
  </property>
  
    <property>
     <name>yarn.resourcemanager.cluster-id</name>
     <value>mycluster</value>
  </property>
  
        <property>
     <name>yarn.resourcemanager.ha.rm-ids</name>
     <value>rm1,rm2</value>
  </property>
   
        <property>
       <name>yarn.resourcemanager.hostname.rm1</name>
       <value>xiemeng-01</value>
    </property>

        <property>
       <name>yarn.resourcemanager.hostname.rm2</name>
       <value>xiemeng-02</value>
    </property>

        <property>
       <name>yarn.resourcemanager.webapp.address.rm1</name>
       <value>xiemeng-01:8088</value>
    </property>

        <property>
       <name>yarn.resourcemanager.webapp.address.rm2</name>
       <value>xiemeng-02:8088</value>
    </property>

  <property>
     <name>yarn.resourcemanager.zk-address</name>
     <value>192.168.64.128:2181,192.168.64.130:2181,192.168.64.131:2181</value>
  </property>

     <property>
       <name>yarn.nodemanager.pmem-check-enabled</name>
       <value>false</value>
     </property>

     <!--是否启动一个线程查询每个任务使用的虚拟内存量,如果任务超出内存值直接杀掉,默认为true-->
     <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
     </property>

     <property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
     </property>

    <!-- 开启标签功能 -->
    <property>
        <name>yarn.node-labels.enabled</name>
        <value>true</value>
    </property>

   <!-- 设置标签存储位置-->
    <property>
       <name>yarn.node-labels.fs-store.root-dir</name>
       <value>hdfs://mycluster/yn/node-labels/</value>
    </property>

    <!-- 开启资源抢占监控 -->
    <property>
       <name>yarn.resourcemanager.scheduler.monitor.enable</name>
       <value>true</value>
    </property>

    <!-- 设置一轮抢占的资源占比,默认为0.1 -->
    <property>
      <name>yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round</name>
      <value>0.3</value>
    </property>
</configuration>

workers

xiemeng-01
xiemeng-02
xiemeng-03
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>
        <property>
                <name>mapreduce.application.classpath</name>
                <value>
                 /home/xiemeng/software/hadoop-3.2.0/etc/hadoop,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/common/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/common/lib/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/hdfs/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/hdfs/lib/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/mapreduce/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/mapreduce/lib/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/yarn/*,
                  /home/xiemeng/software/hadoop-3.2.0/share/hadoop/yarn/lib/*
                </value>
        </property>
</configuration>

capacity-scheduler.xml

<configuration>
  <property>
    <name>yarn.scheduler.capacity.maximum-applications</name>
    <value>10000</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
    <value>0.1</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.leaf-queue-template.ordering-policy</name>
    <value>fair</value>
  </property>
<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default,low</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.capacity</name>
    <value>40</value>
  </property>

   <property>
    <name>yarn.scheduler.capacity.root.low.capacity</name>
    <value>60</value>
  </property>
 
  <property>
    <name>yarn.scheduler.capacity.root.default.user-limit-factor</name>
    <value>1</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.low.user-limit-factor</name>
    <value>1</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.maximum-capacity</name>
    <value>60</value>
  </property>

   <property>
    <name>yarn.scheduler.capacity.root.low.maximum-capacity</name>
    <value>80</value>
  </property>

 
  <property>
    <name>yarn.scheduler.capacity.root.default.default-application-priority</name>
    <value>100</value>
  </property>

  <property>
     <name>yarn.scheduler.capacity.root.low.default-application-priority</name>
 <value>100</value>
   </property>

   <property>
    <name>yarn.scheduler.capacity.root.low.acl_administer_queue</name>
    <value>xiemeng,root</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.low.acl_submit_applications</name>
    <value>xiemeng,root</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.acl_administer_queue</name>
    <value>xiemeng,root</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.default.acl_submit_applications</name>
    <value>xiemeng,root</value>
  </property>
</configuration>

Hadoop
启动Hadoop集群: 
Step1 : 在各个JournalNode节点上,输入以下命令启动journalnode服务: sbin/hadoop-daemon.sh start journalnode 
Step2: 在[nn1]上,对其进行格式化,并启动: bin/hdfs namenode -format sbin/hadoop-daemon.sh start namenode 
Step3: 在[nn2]上,同步nn1的元数据信息: hdfs namenode -bootstrapStandby

查看执行任务日志

yarn logs -applicationId   application_1607776903207_0002

2. 基本架构 jobMannager  resourceManager  TaskMananger  一些流程

3.hadoop 命令行操作

hdfs dfs -put [-f] [-p] <localsrc> ... <dst>
hdfs dfs -get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>
hadoop hdfs dfs –put [本地目录] [hadoop目录] 
hadoop fs -mkdir -p < hdfs dir >

3.hadoop java 操作

  Mapper,Reducer,InputFormat OutPutFormat Comparator Partition Comperess

public class WordCountMapper  extends Mapper<LongWritable,Text,Text,LongWritable> {

    /**
     * 初始化
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
    }

    /**
     *
     * 用户业务
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String str = value.toString();
        String [] words  = StringUtils.split(str);
        for(String word:words){
            context.write(new Text(word),new LongWritable(1));
        }
    }

    /**
     * 清理资源
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        super.cleanup(context);
    }
}
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count =0;
        for(LongWritable value:values){
            count += value.get();
        }
        context.write(key,new LongWritable(count));
    }
}


public class WordCountDriver {
    public static void main(String[] args) {
        Configuration config = new Configuration();
        System.setProperty("HADOOP_USER_NAME", "xiemeng");
        config.set("fs.defaultFS","hdfs://192.168.64.128:9870");
        config.set("mapreduce.framework.name","yarn");
        config.set("yarn.resourcemanager.hostname","192.168.64.128");
        config.set("mapreduce.app-submission.cross-platform", "true");
        config.set("mapreduce.job.jar","file:/D:/code/hadoop-start-demo/target/hadoop-start-demo-1.0-SNAPSHOT.jar");
        try {
            Job job = Job.getInstance(config);
            job.setJarByClass(WordCountDriver.class);
            job.setMapperClass(WordCountMapper.class);
            job.setCombinerClass(WordCountCombiner.class);
            job.setReducerClass(WordCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);


            FileInputFormat.setInputPaths(job,new Path("/wordcount/input"));
            FileOutputFormat.setOutputPath(job,new Path("/wordcount2/output"));

            instance.setGroupingComparatorClass(OrderGroupintComparator.class);

            FileOutputFormat.setCompressOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);


            boolean complete = job.waitForCompletion(true);
            System.exit(complete ? 0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }   
}
public class OrderGroupintComparator extends WritableComparator {

    public OrderGroupintComparator() {
        super(OrderBean.class,true);
    }

    @Override
    public int compare(Object o1, Object o2) {
        OrderBean orderBean = (OrderBean) o1;
        OrderBean orderBean2 = (OrderBean)o2;
        if(orderBean.getOrderId() > orderBean2.getOrderId()){
            return 1;
        }else if(orderBean.getOrderId() < orderBean2.getOrderId()){
            return -1;
        }else {
            return 0;
        }
    }
}
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        CustomWriter customWriter = new CustomWriter(taskAttemptContext);
        return customWriter;
    }

    protected static class CustomWriter extends RecordWriter<Text, NullWritable> {

        private FileSystem fs;

        private FSDataOutputStream fos;

        private TaskAttemptContext context;

        public CustomWriter(TaskAttemptContext context) {
            this.context = context;
        }

        @Override
        public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
            fs = FileSystem.get(context.getConfiguration());
            String key = text.toString();
            Path path = null;
            if (StringUtils.startsWith(key, "137")) {
                path = new Path("file:/D:/hadoop/output/format/out/137/");
            } else {
                path = new Path("file:/D:/hadoop/output/format/out/138/");
            }
            fos = fs.create(path,true);
            byte[] bys = new byte[text.getLength()];
            fos.write(text.toString().getBytes());
        }

        @Override
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            IOUtils.closeQuietly(fos);
            IOUtils.closeQuietly(fs);
        }
    }
}
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        WholeRecordReader reader  = new WholeRecordReader();
        reader.initialize(inputSplit, taskAttemptContext);
        return reader;
    }
}
@Data
public class FlowBeanObj  implements Writable, WritableComparable<FlowBeanObj> {

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    @Override
    public int compareTo(FlowBeanObj o) {
       if(o.getSumFlow() > this.getSumFlow()){
           return -1;
       }else if(o.getSumFlow() < this.getSumFlow()){
           return 1;
       }else {
           return 0;
       }
    }
}

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    private Configuration config;

    private FileSplit fileSplit;

    private boolean isProgress = true;

    private BytesWritable value = new BytesWritable();

    private Text k = new Text();

    private FileSystem fs;

    private FSDataInputStream fis;

    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        fileSplit = (FileSplit) inputSplit;
        this.config = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        try {
            if (isProgress) {
                byte[] contents = new byte[(int) fileSplit.getLength()];
                Path path = fileSplit.getPath();
                fs = path.getFileSystem(config);
                fis = fs.open(path);
                IOUtils.readFully(fis,contents, 0,contents.length);
                value.set(contents, 0, contents.length);
                k.set(fileSplit.getPath().toString());
                isProgress = false;
                return true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            IOUtils.closeQuietly(fis);
        }
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        fis.close();
        fs.close();
    }
}
public class HdfsClient {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        Configuration config = new Configuration();
        config.set("fs.defaultFS","hdfs://localhost:9000");
        config.set("dfs.replication","2");
        FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"),config,"xieme");
        fs.mkdirs(new Path("/hive3"));
        fs.copyFromLocalFile(new Path("file:/d:/elasticsearch.txt") ,new Path("/hive3"));
        fs.copyToLocalFile(false,new Path("/hive3/elasticsearch.txt"), new Path("file:/d:/hive3/elasticsearch2.txt"));
        fs.rename(new Path("/hive3/elasticsearch.txt"),new Path("/hive3/elasticsearch2.txt"));
        RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = fs.listFiles(new Path("/"), true);
        while(locatedFileStatusRemoteIterator.hasNext()){
            LocatedFileStatus next = locatedFileStatusRemoteIterator.next();
            System.out.print(next.getPath().getName()+"\t");
            System.out.print(next.getLen()+"\t");
            System.out.print(next.getGroup()+"\t");
            System.out.print(next.getOwner()+"\t");
            System.out.print(next.getPermission()+"\t");
            System.out.print(next.getPath()+"\t");
            BlockLocation[] blockLocations = next.getBlockLocations();
            for(BlockLocation queue: blockLocations){
                for(String host :queue.getHosts()){
                    System.out.print(host+"\t");
                }
            }
            System.out.println("");
        }*/
        //fs.delete(new Path("/hive3"),true);
        /*FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
        for(FileStatus fileStatus:fileStatuses){
            if(fileStatus.isDirectory()){
                System.out.println(fileStatus.getPath().getName());
            }
        }*/

        // 流copy
        FileInputStream fis = new FileInputStream("d:/elasticsearch.txt");
        FSDataOutputStream fos = fs.create(new Path("/hive/elasticsearch.txt"));
        IOUtils.copyBytes(fis,fos, config);
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);

        FSDataInputStream fis2 = fs.open(new Path("/hive/elasticsearch.txt"));
        FileOutputStream fos2 = new FileOutputStream("d:/elasticsearch.tar.gz.part1");
        fis2.seek(1);
        IOUtils.copyBytes(fis2,fos2,config);
        /*byte [] buf = new byte[1024];
        for(int i=0; i<128;i++){
            while(fis2.read(buf)!=-1){
                fos2.write(buf);
            }
        }*/
        IOUtils.closeStream(fis2);
        IOUtils.closeStream(fos2);
        fs.close();
    }
}

3. hadoop 优化

相关推荐

  1. hadoop 总结

    2024-03-10 16:52:04       46 阅读
  2. Hadoop系列总结

    2024-03-10 16:52:04       43 阅读
  3. 总结Hadoop高可用

    2024-03-10 16:52:04       30 阅读
  4. Hadoop

    2024-03-10 16:52:04       49 阅读
  5. Hadoop

    2024-03-10 16:52:04       43 阅读
  6. <span style='color:red;'>Hadoop</span>

    Hadoop

    2024-03-10 16:52:04      59 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-10 16:52:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-10 16:52:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-03-10 16:52:04       87 阅读
  4. Python语言-面向对象

    2024-03-10 16:52:04       96 阅读

热门阅读

  1. 解决:Glide 在回调中再次加载图片报错

    2024-03-10 16:52:04       48 阅读
  2. sql返回数据怎么添加索引

    2024-03-10 16:52:04       40 阅读
  3. 速盾网络:cdn加速技术和云计算的区别

    2024-03-10 16:52:04       43 阅读
  4. adb shell pm 查询设备应用

    2024-03-10 16:52:04       46 阅读
  5. springcloud学习过程错误

    2024-03-10 16:52:04       51 阅读
  6. spring三种配置方式总结

    2024-03-10 16:52:04       40 阅读
  7. 学习笔记 反悔贪心

    2024-03-10 16:52:04       33 阅读
  8. Kafka|处理 Kafka 消息重复的有效措施

    2024-03-10 16:52:04       45 阅读