/**
* Register an instance to a service in AP mode.
*
* <p>This method creates service or cluster silently if they don't exist.
*
* @param namespaceId id of namespace
* @param serviceName service name
* @param instance instance to register
* @throws Exception any error occurred in the process
*/publicvoidregisterInstance(StringnamespaceId,StringserviceName,Instanceinstance)throwsNacosException{// 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表)// 此时不包含实例信息createEmptyService(namespaceId,serviceName,instance.isEphemeral());// 拿到创建好的serviceServiceservice=getService(namespaceId,serviceName);// 拿不到则抛异常if(service==null){thrownewNacosException(NacosException.INVALID_PARAM,"service not found, namespace: "+namespaceId+", service: "+serviceName);}// 添加要注册的实例到service中addInstance(namespaceId,serviceName,instance.isEphemeral(),instance);}
publicList<Instance>updateIpAddresses(Serviceservice,Stringaction,booleanephemeral,Instance...ips)throwsNacosException{// 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum// 第一次来,肯定是nullDatumdatum=consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(),ephemeral));// 得到服务中现有的实例列表List<Instance>currentIPs=service.allIPs(ephemeral);// 创建map,保存实例列表,key为ip地址,value是Instance对象Map<String,Instance>currentInstances=newHashMap<>(currentIPs.size());// 创建Set集合,保存实例的instanceIdSet<String>currentInstanceIds=Sets.newHashSet();// 遍历要现有的实例列表for(Instanceinstance:currentIPs){// 添加到map中currentInstances.put(instance.toIpAddr(),instance);// 添加instanceId到set中currentInstanceIds.add(instance.getInstanceId());}// 创建map,用来保存更新后的实例列表Map<String,Instance>instanceMap;if(datum!=null&&null!=datum.value){// 如果服务中已经有旧的数据,则先保存旧的实例列表instanceMap=setValid(((Instances)datum.value).getInstanceList(),currentInstances);}else{// 如果没有旧数据,则直接创建新的mapinstanceMap=newHashMap<>(ips.length);}// 遍历实例列表for(Instanceinstance:ips){// 判断服务中是否包含要注册的实例的cluster信息if(!service.getClusterMap().containsKey(instance.getClusterName())){// 如果不包含,创建新的clusterClustercluster=newCluster(instance.getClusterName(),service);cluster.init();// 将集群放入service的注册表service.getClusterMap().put(instance.getClusterName(),cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(),instance.toJson());}// 删除实例 or 新增实例 ?if(UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)){instanceMap.remove(instance.getDatumKey());}else{// 新增实例,instance生成全新的instanceIdInstanceoldInstance=instanceMap.get(instance.getDatumKey());if(oldInstance!=null){instance.setInstanceId(oldInstance.getInstanceId());}else{instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}// 放入instance列表instanceMap.put(instance.getDatumKey(),instance);}}if(instanceMap.size()<=0&&UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)){thrownewIllegalArgumentException("ip list can not be empty, service: "+service.getName()+", ip list: "+JacksonUtils.toJson(instanceMap.values()));}// 将instanceMap中的所有实例转为List返回returnnewArrayList<>(instanceMap.values());}
@OverridepublicvoidonChange(Stringkey,Instancesvalue)throwsException{Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}",key,value);// 更新实例列表updateIPs(value.getInstanceList(),KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();}
publicvoidupdateIPs(Collection<Instance>instances,booleanephemeral){// 准备一个Map,key是cluster,值是集群下的Instance集合Map<String,List<Instance>>ipMap=newHashMap<>(clusterMap.size());// 获取服务的所有cluster名称for(StringclusterName:clusterMap.keySet()){ipMap.put(clusterName,newArrayList<>());}// 遍历要更新的实例for(Instanceinstance:instances){try{if(instance==null){Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}// 判断实例是否包含clusterName,没有的话用默认clusterif(StringUtils.isEmpty(instance.getClusterName())){instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}// 判断cluster是否存在,不存在则创建新的clusterif(!clusterMap.containsKey(instance.getClusterName())){Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(),instance.toJson());Clustercluster=newCluster(instance.getClusterName(),this);cluster.init();getClusterMap().put(instance.getClusterName(),cluster);}// 获取当前cluster实例的集合,不存在则创建新的List<Instance>clusterIPs=ipMap.get(instance.getClusterName());if(clusterIPs==null){clusterIPs=newLinkedList<>();ipMap.put(instance.getClusterName(),clusterIPs);}// 添加新的实例到 Instance 集合clusterIPs.add(instance);}catch(Exceptione){Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: "+instance,e);}}for(Map.Entry<String,List<Instance>>entry:ipMap.entrySet()){//make every ip mineList<Instance>entryIPs=entry.getValue();// 将实例集合更新到 clusterMap(注册表)clusterMap.get(entry.getKey()).updateIps(entryIPs,ephemeral);}setLastModifiedMillis(System.currentTimeMillis());// 发布服务变更的通知消息getPushService().serviceChanged(this);StringBuilderstringBuilder=newStringBuilder();for(Instanceinstance:allIPs()){stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}",getNamespaceId(),getName(),stringBuilder.toString());}
protectedvoidprocessTasks(){Collection<Object>keys=getAllTaskKeys();for(ObjecttaskKey:keys){AbstractDelayTasktask=removeTask(taskKey);if(null==task){continue;}NacosTaskProcessorprocessor=getProcessor(taskKey);if(null==processor){getEngineLog().error("processor not found for task, so discarded. "+task);continue;}try{// 尝试执行同步任务,如果失败会重试if(!processor.process(task)){retryFailedTask(taskKey,task);}}catch(Throwablee){getEngineLog().error("Nacos task execute error : "+e.toString(),e);retryFailedTask(taskKey,task);}}}
@CanDistro@PutMapping("/beat")@Secured(parser=NamingResourceParser.class,action=ActionTypes.WRITE)publicObjectNodebeat(HttpServletRequestrequest)throwsException{// 解析心跳的请求参数ObjectNoderesult=JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,switchDomain.getClientBeatInterval());Stringbeat=WebUtils.optional(request,"beat",StringUtils.EMPTY);RsInfoclientBeat=null;if(StringUtils.isNotBlank(beat)){clientBeat=JacksonUtils.toObj(beat,RsInfo.class);}StringclusterName=WebUtils.optional(request,CommonParams.CLUSTER_NAME,UtilsAndCommons.DEFAULT_CLUSTER_NAME);Stringip=WebUtils.optional(request,"ip",StringUtils.EMPTY);intport=Integer.parseInt(WebUtils.optional(request,"port","0"));if(clientBeat!=null){if(StringUtils.isNotBlank(clientBeat.getCluster())){clusterName=clientBeat.getCluster();}else{// fix #2533clientBeat.setCluster(clusterName);}ip=clientBeat.getIp();port=clientBeat.getPort();}StringnamespaceId=WebUtils.optional(request,CommonParams.NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);StringserviceName=WebUtils.required(request,CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}",clientBeat,serviceName);// 尝试根据参数中的namespaceId、serviceName、clusterName、ip、port等信息// 从Nacos的注册表中 获取实例Instanceinstance=serviceManager.getInstance(namespaceId,serviceName,clusterName,ip,port);// 如果获取失败,说明心跳失败,实例尚未注册if(instance==null){if(clientBeat==null){result.put(CommonParams.CODE,NamingResponseCode.RESOURCE_NOT_FOUND);returnresult;}Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+"perform data compensation operations, beat: {}, serviceName: {}",clientBeat,serviceName);// 这里重新注册一个实例instance=newInstance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());serviceManager.registerInstance(namespaceId,serviceName,instance);}// 尝试基于namespaceId和serviceName从 注册表中获取Service服务Serviceservice=serviceManager.getService(namespaceId,serviceName);// 如果不存在,说明服务不存在,返回404if(service==null){thrownewNacosException(NacosException.SERVER_ERROR,"service not found: "+serviceName+"@"+namespaceId);}if(clientBeat==null){clientBeat=newRsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 如果心跳没问题,开始处理心跳结果service.processClientBeat(clientBeat);result.put(CommonParams.CODE,NamingResponseCode.OK);if(instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)){result.put(SwitchEntry.CLIENT_BEAT_INTERVAL,instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED,switchDomain.isLightBeatEnabled());returnresult;}
@Overridepublicvoidrun(){try{// 找到所有临时实例的列表List<Instance>instances=service.allIPs(true);// first set health status of instances:for(Instanceinstance:instances){// 判断 心跳间隔(当前时间 - 最后一次心跳时间) 是否大于 心跳超时时间,默认15秒if(System.currentTimeMillis()-instance.getLastBeat()>instance.getInstanceHeartBeatTimeOut()){if(!instance.isMarked()){if(instance.isHealthy()){// 如果超时,标记实例为不健康 healthy = falseinstance.setHealthy(false);// 发布实例状态变更的事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(newInstanceHeartbeatTimeoutEvent(this,instance));}}}}if(!getGlobalConfig().isExpireInstance()){return;}// then remove obsolete instances:for(Instanceinstance:instances){if(instance.isMarked()){continue;}// 判断心跳间隔(当前时间 - 最后一次心跳时间)是否大于 实例被删除的最长超时时间,默认30秒if(System.currentTimeMillis()-instance.getLastBeat()>instance.getIpDeleteTimeout()){// 如果是超过了30秒,则删除实例Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}",service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}}catch(Exceptione){Loggers.SRV_LOG.warn("Exception while processing client beat time out.",e);}}
publicvoidcreateServiceIfAbsent(StringnamespaceId,StringserviceName,booleanlocal,Clustercluster)throwsNacosException{// 尝试获取服务Serviceservice=getService(namespaceId,serviceName);if(service==null){// 发现服务不存在,开始创建新服务Loggers.SRV_LOG.info("creating empty service {}:{}",namespaceId,serviceName);service=newService();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if(cluster!=null){cluster.setService(service);service.getClusterMap().put(cluster.getName(),cluster);}service.validate();// ** 写入注册表并初始化 **putServiceAndInit(service);if(!local){addOrReplaceService(service);}}}
@OverridepublicVoidcall(){// 获取检测任务已经等待的时长longwaited=System.currentTimeMillis()-beat.getStartTime();if(waited>MAX_WAIT_TIME_MILLISECONDS){Loggers.SRV_LOG.warn("beat task waited too long: "+waited+"ms");}SocketChannelchannel=null;try{// 获取实例信息Instanceinstance=beat.getIp();// 通过NIO建立TCP连接channel=SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false,-1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);Clustercluster=beat.getTask().getCluster();intport=cluster.isUseIPPort4Check()?instance.getPort():cluster.getDefCkport();channel.connect(newInetSocketAddress(instance.getIp(),port));// 注册连接、读取事件SelectionKeykey=channel.register(selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(),newBeatKey(key));beat.setStartTime(System.currentTimeMillis());GlobalExecutor.scheduleTcpSuperSenseTask(newTimeOutTask(key),CONNECT_TIMEOUT_MS,TimeUnit.MILLISECONDS);}catch(Exceptione){beat.finishCheck(false,false,switchDomain.getTcpHealthParams().getMax(),"tcp:error:"+e.getMessage());if(channel!=null){try{channel.close();}catch(Exceptionignore){}}}returnnull;}
@Overridepublicvoidrun(){while(!closed){try{// byte[] is initialized with 0 full filled by defaultbyte[]buffer=newbyte[UDP_MSS];DatagramPacketpacket=newDatagramPacket(buffer,buffer.length);// 接收推送数据udpSocket.receive(packet);// 解析为json字符串Stringjson=newString(IoUtils.tryDecompress(packet.getData()),UTF_8).trim();NAMING_LOGGER.info("received push data: "+json+" from "+packet.getAddress().toString());// 反序列化为对象PushPacketpushPacket=JacksonUtils.toObj(json,PushPacket.class);Stringack;if("dom".equals(pushPacket.type)||"service".equals(pushPacket.type)){// 交给 HostReactor去处理hostReactor.processServiceJson(pushPacket.data);// send ack to server 发送ACK回执,略。。}catch(Exceptione){if(closed){return;}NAMING_LOGGER.error("[NA] error while receiving push data",e);}}}
/**
* Get all instance of input service.
*
* @param request http request
* @return list of instance
* @throws Exception any error during list
*/@GetMapping("/list")@Secured(parser=NamingResourceParser.class,action=ActionTypes.READ)publicObjectNodelist(HttpServletRequestrequest)throwsException{// 从request中获取namespaceId和serviceNameStringnamespaceId=WebUtils.optional(request,CommonParams.NAMESPACE_ID,Constants.DEFAULT_NAMESPACE_ID);StringserviceName=WebUtils.required(request,CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Stringagent=WebUtils.getUserAgent(request);Stringclusters=WebUtils.optional(request,"clusters",StringUtils.EMPTY);StringclientIP=WebUtils.optional(request,"clientIP",StringUtils.EMPTY);// 获取客户端的 UDP端口intudpPort=Integer.parseInt(WebUtils.optional(request,"udpPort","0"));Stringenv=WebUtils.optional(request,"env",StringUtils.EMPTY);booleanisCheck=Boolean.parseBoolean(WebUtils.optional(request,"isCheck","false"));Stringapp=WebUtils.optional(request,"app",StringUtils.EMPTY);Stringtenant=WebUtils.optional(request,"tid",StringUtils.EMPTY);booleanhealthyOnly=Boolean.parseBoolean(WebUtils.optional(request,"healthyOnly","false"));// 获取服务列表returndoSrvIpxt(namespaceId,serviceName,agent,clusters,clientIP,udpPort,env,isCheck,app,tenant,healthyOnly);}
publicObjectNodedoSrvIpxt(StringnamespaceId,StringserviceName,Stringagent,Stringclusters,StringclientIP,intudpPort,Stringenv,booleanisCheck,Stringapp,Stringtid,booleanhealthyOnly)throwsException{ClientInfoclientInfo=newClientInfo(agent);ObjectNoderesult=JacksonUtils.createEmptyJsonNode();// 获取服务列表信息Serviceservice=serviceManager.getService(namespaceId,serviceName);longcacheMillis=switchDomain.getDefaultCacheMillis();// now try to enable the pushtry{if(udpPort>0&&pushService.canEnablePush(agent)){// 添加当前客户端 IP、UDP端口到 PushService 中pushService.addClient(namespaceId,serviceName,clusters,agent,newInetSocketAddress(clientIP,udpPort),pushDataSource,tid,app);cacheMillis=switchDomain.getPushCacheMillis(serviceName);}}catch(Exceptione){Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}",clientInfo,clientIP,udpPort,e);cacheMillis=switchDomain.getDefaultCacheMillis();}if(service==null){// 如果没找到,返回空if(Loggers.SRV_LOG.isDebugEnabled()){Loggers.SRV_LOG.debug("no instance to serve for service: {}",serviceName);}result.put("name",serviceName);result.put("clusters",clusters);result.put("cacheMillis",cacheMillis);result.replace("hosts",JacksonUtils.createEmptyArrayNode());returnresult;}// 结果的检测,异常实例的剔除等逻辑省略// 最终封装结果并返回 。。。result.replace("hosts",hosts);if(clientInfo.type==ClientInfo.ClientType.JAVA&&clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0"))>=0){result.put("dom",serviceName);}else{result.put("dom",NamingUtils.getServiceName(serviceName));}result.put("name",serviceName);result.put("cacheMillis",cacheMillis);result.put("lastRefTime",System.currentTimeMillis());result.put("checksum",service.getChecksum());result.put("useSpecifiedURL",false);result.put("clusters",clusters);result.put("env",env);result.replace("metadata",JacksonUtils.transferToJsonNode(service.getMetadata()));returnresult;}