srs集群下行edge处理逻辑

官方关于源站集群的介绍:

Origin Cluster | SRS

下行边缘是指观众端从边缘edge拉流,边缘edge回源到源站origin节点拉流,然后再

把流转给客户端

边缘处理类SrsPlayEdge

当服务器收到播放请求时,创建对应的consumer消费者。在创建消费者consumer时会判断当前服务器的类型,如果服务器是边缘edge,就通过play_edge进行处理。每一个SrsLiveSource都有一个对应的 SrsPlayEdge *play_edge,如果配置文件指定了remote才开启边缘逻辑。

srs_error_t SrsLiveSource::create_consumer(ISrsConnection* conn, SrsLiveConsumer*& consumer)
{
    srs_error_t err = srs_success;
    
    consumer = new SrsLiveConsumer(this, conn);
    consumers.push_back(consumer);
    if (conn != NULL) {
        conn->srsConsumer = consumer;
    }

    // There should be one consumer, so reset the timeout.
    stream_die_at_ = 0;
    publisher_idle_at_ = 0;
    //通过配置文件中的参数,判断是否是边缘服务器
    //如果是边缘服务器,则调用 play_edge进行拉流播放
    //SrsPlayEdge* play_edge;
    // for edge, when play edge stream, check the state
    if (_srs_config->get_vhost_is_edge(req->vhost)) {
        // notice edge to start for the first client.
        if ((err = play_edge->on_client_play()) != srs_success) {
            return srs_error_wrap(err, "play edge");
        }
    }
    
    return err;
}

SrsPlayEdge会通过SrsEdgeIngester进行拉流

srs_error_t SrsPlayEdge::on_client_play()
{
    srs_error_t err = srs_success;
    //SrsEdgeIngester ingester 启动一个新的协程去源站拉流
    // start ingest when init state.
    if (state == SrsEdgeStateInit) {
        state = SrsEdgeStatePlay;
        err = ingester->start();
    } else if (state == SrsEdgeStateIngestStopping) {
        return srs_error_new(ERROR_RTMP_EDGE_PLAY_STATE, "state is stopping");
    }

    
    return err;
}

拉流类SrsEdgeIngester

SrsEdgeIngester会启动一个协程SrsSTCoroutine进行拉流处理 

srs_error_t SrsEdgeIngester::start()
{
    srs_error_t err = srs_success;
    
    if ((err = source->on_publish()) != srs_success) {
        return srs_error_wrap(err, "notify source");
    }
    
    srs_freep(trd);
    trd = new SrsSTCoroutine("edge-igs", this);
    
    if ((err = trd->start()) != srs_success) {
        return srs_error_wrap(err, "coroutine");
    }
    
    return err;
}

真正拉流类 SrsEdgeUpstream

协程会有一个while循环不停的去拉流,目前边缘回源拉流支持两种协议rtmp和flv,根据配置参数创建对应的拉流对象

srs_error_t SrsEdgeIngester::do_cycle()
{
     while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "do cycle pull");
        }

        // Use protocol in config.
        string edge_protocol = _srs_config->get_vhost_edge_protocol(req->vhost);

        // If follow client protocol, change to protocol of client.
        bool follow_client = _srs_config->get_vhost_edge_follow_client(req->vhost);
        if (follow_client && !req->protocol.empty()) {
            edge_protocol = req->protocol;
        }

        // Create object by protocol.
        srs_freep(upstream);
        //根据边缘协议创建对应的拉流类
        if (edge_protocol == "flv" || edge_protocol == "flvs") {
            upstream = new SrsEdgeFlvUpstream(edge_protocol == "flv"? "http" : "https");
        } else {
            upstream = new SrsEdgeRtmpUpstream(redirect);
        }
        
        if ((err = source->on_source_id_changed(_srs_context->get_id())) != srs_success) {
            return srs_error_wrap(err, "on source id changed");
        }
        //边缘服务连接源站服务,一般源站会部署多个节点,边缘选取源站节点时也是通过RoundRobin算法选取
        //其中一个节点进行拉流
        //这里需要注意一点,如果负载到一台没有流的源站节点上怎么办?
        //其实如果发现连接的源站没有流,会触发302 redirect重连逻辑
        if ((err = upstream->connect(req, lb)) != srs_success) {
            return srs_error_wrap(err, "connect upstream");
        }
        
        if ((err = edge->on_ingest_play()) != srs_success) {
            return srs_error_wrap(err, "notify edge play");
        }

        // set to larger timeout to read av data from origin.
        upstream->set_recv_timeout(SRS_EDGE_INGESTER_TIMEOUT);
        //拉流处理函数
        err = ingest(redirect);
        
        if (srs_is_client_gracefully_close(err)) {
            srs_warn("origin disconnected, retry, error %s", srs_error_desc(err).c_str());
            srs_error_reset(err);
        }
        break;
    }
    
}

拉流源站没有流触发302

边缘服务通过负载均衡获取源站节点 ,然后去源站拉流,如果当前源站节点没有流,会触发320 redirect 重定向另一台。srs目前会重试三次,如果三次之后还是拉不到流,就认为失败了

srs_error_t SrsEdgeFlvUpstream::do_connect(SrsRequest* r, SrsLbRoundRobin* lb, int redirect_depth)
{
    //第一次连接源站节点时 redirect_depth = 0,通过lb->select负载均衡随机选择一台
    //如果连接的源站没有流,触发302,再连接另一台
     if (redirect_depth == 0) {
        SrsConfDirective* conf = _srs_config->get_vhost_edge_origin(req->vhost);

        // @see https://github.com/ossrs/srs/issues/79
        // when origin is error, for instance, server is shutdown,
        // then user remove the vhost then reload, the conf is empty.
        if (!conf) {
            return srs_error_new(ERROR_EDGE_VHOST_REMOVED, "vhost %s removed", req->vhost.c_str());
        }

        // select the origin.
        std::string server = lb->select(conf->args);
        int port = SRS_DEFAULT_HTTP_PORT;
        if (schema_ == "https") {
            port = SRS_DEFAULT_HTTPS_PORT;
        }
        srs_parse_hostport(server, server, port);

        // Remember the current selected server.
        selected_ip = server;
        selected_port = port;
    } else {
        // If HTTP redirect, use the server in location.
        schema_ = req->schema;
        selected_ip = req->host;
        selected_port = req->port;
    }
    
    sdk_ = new SrsHttpClient();
    if ((err = sdk_->initialize(schema_, selected_ip, selected_port, cto)) != srs_success) {
        return srs_error_wrap(err, "edge pull %s failed, cto=%dms.", url.c_str(), srsu2msi(cto));
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
    if ((err = sdk_->get(path, "", &hr_)) != srs_success) {
        return srs_error_wrap(err, "edge get %s failed, path=%s", url.c_str(), path.c_str());
    }

    if (hr_->status_code() == 404) {
        return srs_error_new(ERROR_RTMP_STREAM_NOT_FOUND, "Connect to %s, status=%d", url.c_str(), hr_->status_code());
    }
  
    //如果状态码为302,开启重连另一台逻辑
    string location;
    if (hr_->status_code() == 302) {
        //获取302返回的地址
        location = hr_->header()->get("Location");
    }
    srs_trace("Edge: Connect to %s ok, status=%d, location=%s", url.c_str(), hr_->status_code(), location.c_str());

    if (hr_->status_code() == 302) {
        //最多重试三次
        if (redirect_depth >= 3) {
            return srs_error_new(ERROR_HTTP_302_INVALID, "redirect to %s fail, depth=%d", location.c_str(), redirect_depth);
        }

        string app;
        string stream_name;
        if (true) {
            string tcUrl;
            srs_parse_rtmp_url(location, tcUrl, stream_name);

            int port;
            string schema, host, vhost, param;
            srs_discovery_tc_url(tcUrl, schema, host, vhost, app, stream_name, port, param);

            r->schema = schema; r->host = host; r->port = port;
            r->app = app; r->stream = stream_name; r->param = param;
        }
        //重连
        return do_connect(r, lb, redirect_depth + 1);
    }
}

回源拉流的逻辑

边缘节点连接源站成功后,即找到有流的源站,然后就开始通过upstream进行拉流

srs_error_t SrsEdgeIngester::ingest(string& redirect)
{
    while (true) {
        if ((err = trd->pull()) != srs_success) {
            return srs_error_wrap(err, "thread quit");
        }
        
        pprint->elapse();
        
        // pithy print
        if (pprint->can_print()) {
            upstream->kbps_sample(SRS_CONSTS_LOG_EDGE_PLAY, pprint->age());
        }
        
        // read from client.
        SrsCommonMessage* msg = NULL;
        //upstream拉流
        if ((err = upstream->recv_message(&msg)) != srs_success) {
            return srs_error_wrap(err, "recv message");
        }
        
        srs_assert(msg);
        SrsAutoFree(SrsCommonMessage, msg);
        //处理拉到的流
        if ((err = process_publish_message(msg, redirect)) != srs_success) {
            return srs_error_wrap(err, "process message");
        }
    }
}

处理拉到的流,拉到流后和普通单节点就一样了,把流转给 SrsLiveSource ,然后再转给对应的consumer

srs_error_t SrsEdgeIngester::process_publish_message(SrsCommonMessage* msg, string& redirect)
{
    srs_error_t err = srs_success;
    
    // process audio packet
    if (msg->header.is_audio()) {
        if ((err = source->on_audio(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume audio");
        }
    }
    
    // process video packet
    if (msg->header.is_video()) {
        if ((err = source->on_video(msg)) != srs_success) {
            return srs_error_wrap(err, "source consume video");
        }
    }
   }

相关推荐

  1. k8s的CA证书过期处理

    2024-02-22 09:58:01       25 阅读
  2. 算法| ss 逻辑问题

    2024-02-22 09:58:01       26 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-02-22 09:58:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-22 09:58:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-22 09:58:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-22 09:58:01       20 阅读

热门阅读

  1. 人工智能与开源机器学习框架

    2024-02-22 09:58:01       31 阅读
  2. [OpenGL教程05 ] glAccum() 函数对累积缓存设置

    2024-02-22 09:58:01       23 阅读
  3. spring缓存的使用

    2024-02-22 09:58:01       28 阅读
  4. 保存Json对象到数据库

    2024-02-22 09:58:01       31 阅读
  5. LeetCode--代码详解 4.寻找两个正序数组的中位数

    2024-02-22 09:58:01       26 阅读
  6. Docker实战之下载Mysql、Redis、Zimg

    2024-02-22 09:58:01       38 阅读
  7. k8s-权限管理

    2024-02-22 09:58:01       32 阅读