API介绍及实现–代理端口检测

了解代码结构后可以从控制层开始了解http的api了.

CheckController.java

api : /proxyipcenter/portCheck
param : ip
descriyption : 根据给出的ip地址,探测其开放的端口号(根据常见的端口号进行测试连通性).
// 控制层
@RequestMapping(value = "/portCheck", method = RequestMethod.GET)
public ResponseEntity<ResponseEnvelope<Object>> checkPort(@RequestParam("ip") String ip) {
    boolean b = NonePortResourceTester.sendIp(ip);
    return ReturnUtil.retSuccess(b);
}
// 业务实现
public static boolean sendIp(final String ip) {
    if (instance == null) {
        logger.warn("port check component not start,ip add failed");
        return false;
    }
    if (!instance.isRunning) {
        logger.info("本机没有启动端口扩展检查组件,将会把它转发到其他服务");
        new Thread() {
            @Override
            public void run() {
                String s = HttpInvoker
                        .get(String.format(SysConfig.getInstance()
                        .get("system.port.test.forward.url"), ip));
                logger.info("port test forward response is:{}", s);
            }
        }.start();
        return true;
    }
    return !StringUtils.isEmpty(ip) && instance.addIp(ip);
}

控制层进行业务层的代码调用,并封装结果返回. 若本机的服务未开启这个服务线程(配置文件的system.thread.portCheckThread), 则通过http请求将ip转发到其他服务(配置文件的system.port.test.forward.url)进行探测,然后直接返回成功. 若开启了端口检测,则接下来进行检测:

public boolean addIp(String ip) {
    if (++addTimes > 6000) {
        bloomFilter = new BloomFilter64bit(60000, 10);
        addTimes = 0;
    }
    return !(ip.startsWith("192.168")
    		 || ip.startsWith("10.") || ip.equals("127.0.0.1"))
    		 && bloomFilter.add(ip)
             && ipTaskQueue.offer(ip);
}

首先进行ip添加次数判断,若超过6000次,则实例化一个6w容量的布隆过滤器. 若ip不是局域网地址,则进行布隆过滤器新增该ip,若布隆过滤器中不存在该ip,则新增到一个ipTaskQueue双向并发阻塞队列.

@Override
public void run() {
    ports = proxyRepository.getPortList();
    isRunning = true;
    if (ports.size() < 100) {// 认为是新启动的系统,执行默认代码
        ports = Lists.newArrayList();
        buildDefaultPort(ports);
    }
    while (isRunning) {
        for (int i = 0; i < 10; i++) {// 每次拿10个IP
            try {
                String ip = ipTaskQueue.take();
                List<Future<List<Proxy>>> futures = Lists.newArrayList();
                futures.add(pool.submit(new PortChecker(ip)));

                List<Proxy> proxies = Lists.newArrayList();
                for (Future<List<Proxy>> future : futures) {
                    proxies.addAll(future.get());
                }
                ResourceFilter.filter(proxies);
                proxyService.save(beanMapper.mapAsList(proxies, ProxyModel.class));
            } catch (Exception e) {
                logger.error("can not take ip from task queue");
            }
        }
    }
}

NonePortResourceTester初始化后,会根据system.thread.portCheckThread判断是否启用,若启用,则开始执行如上的线程方法进行阻塞消费ipTaskQueue. 先从数据库中查询port的分布,若port数少于100则进行构建一个默认的探测port列表buildDefaultPort(List<Integer> ports). 从队列中取得ip后,放到Future任务列表中开始检测所有的可能ports,若指定地址是reachable则返回true,标识该代理来自portTester. 接下来进行ip过滤与去重

// 方法:com.virjar.dungproxy.server.utils.ResourceFilter.filter(List<Proxy>)
public static List<Proxy> filter(List<Proxy> proxys) {
    Iterator<Proxy> iterator = proxys.iterator();
    while (iterator.hasNext()) {
        Proxy proxy = iterator.next();
        if (proxy.getIp() == null || proxy.getPort() == null) {
            iterator.remove();
            continue;
        } else if (bloomFilter.contains(proxy.getIp() + proxy.getPort())) {
            iterator.remove();
            continue;
        } else {
            bloomFilter.add(proxy.getIp() + proxy.getPort());
        }
        if (!proxy.getIp().matches(ipregex)) {
            iterator.remove();
        }
        proxy.setIpValue(ProxyUtil.toIPValue(proxy.getIp()));
    }
    return proxys;
}

去重机制同样使用布隆过滤器进行过滤,该过滤器的容量是100w. 到了这里,就把代理进行入库了.

// com.virjar.dungproxy.server.service.impl.ProxyServiceImpl.save(List<ProxyModel>)
@Override
public void save(List<ProxyModel> draftproxys) {
    for (Proxy proxy : beanMapper.mapAsList(draftproxys, Proxy.class)) {
        if (proxy.getPort() != null) {
            Proxy queryProxy = new Proxy();
            queryProxy.setIp(proxy.getIp());
            queryProxy.setPort(proxy.getPort());
            if (proxyRepo.selectCount(queryProxy) >= 1) {
                ResourceFilter.addConflict(proxy);
            } else {
                proxyRepo.insertSelective(proxy);
            }
        } else {
            NonePortResourceTester.sendIp(proxy.getIp());
        }
    }
}