聚合文章列表
16 浏览

MQTT——发布报文

发布报文的知识点并不难,只是多。看过前面几章的读者们应该或多或少都认识服务质量QOS。发布报文跟他的联系最紧的。我们也清楚订阅报文里面虽然也有用到QOS,但是他却没有更进一步的联系。往下看就知道是什么一会事了。服务质量QOS不管是订阅报文还是发布报文都会遇到一个问题——报文流失的问题。所以才会有了服务质量这一说法(个人看法)。什么意思呢?就是用来处理流失的问题。即然报文在发送的过程中可能存在流失的问题。那么最直接的决解方案就是重新发送,不就行了吗。所以服务质量事实就是在表示报文要分送几次。QOS有三个值。分别表示着三种不同的处理报文方式。相应的值如下QOS0:最多分发一次。即是可以零次或是一次。不过笔者认零次的情况大多数是不可能存在的。一次到很常见。QOS1:至少分发一次。QOS2:只分发一次。笔者在看MQTT协议文档的时候,对于QOS的分发一直有一个小问题。笔者以为是指客户端到客户端的分发。不过看了几遍觉得他是指客户端到服务端的分发。那么为什么笔者会认为是客户端到客户端的分发呢?主要是笔者认为QOS是指报文到客户端的次

消息系统 2017-11-10 发布
12 浏览

Kafka 源码解析之 Consumer Poll 模型(七)

在上一篇问文章中已经介绍一个Consumer实例如何加入到一个group中,它是ConsumerPoll模型第一步要做的事件,本文会完整讲述一个Consumer实例在poll模型过程中会做哪些事情,只有理解了poll模型才能更好地理解Consumer端的处理逻辑。Consumer示例这里以一个Consumer的实例代码作为开始,一个比较常见的Consumer示例代码如下所示,其主要包含一下几个步骤:构造Propertity,进行consumer相关的配置;创建KafkaConsumer的对象consumer;订阅相应的topic列表;调用consumer的poll方法拉取订阅的消息。前面两步在Consumer底层上只是创建了一个consumer对象,第三步只有记录一下订阅的topic信息,consumer实际的操作都是第四步,也就是在poll方法中实现的,这也是poll模型对于理解consumer设计非常重要的原因。importorg.apa

消息系统 2017-11-11 发布
14 浏览

消息中间件ActiveMQ及Spring整合JMS的介绍

一、消息中间件的基本介绍1.1消息中间件1.1.1什么是消息中间件消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)常见的消息中间件产品:(1)ActiveMQActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMSProvider实现。我们在本次课程中介绍ActiveMQ的使用。(2)RabbitMQAMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。(3)ZeroMQ史上最快的消息队列系统(4)KafkaApache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。

消息系统 2017-11-11 发布
AD 友情赞助
17 浏览

activemq基础之:(一)什么是JMS

一、什么是JMSJMS即Java消息服务(JavaMessageService)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持(百度百科给出的概述)。我们可以简单的理解:两个应用程序之间需要进行通信,我们使用一个JMS服务,进行中间的转发,通过JMS的使用,我们可以解除两个程序之间的耦合。二、JMS的优势Asynchronous(异步)JMSisasynchronousbydefault.Sotoreceiveamessage,theclientisnotrequiredtosendtherequest.Themessagewillarriveautomaticallytotheclientastheybecomeavailable.(JMS原本就是一个异步的消息服务,客户端获取消息的时候,不需

消息系统 2017-11-12 发布
17 浏览

activemq基础之:(三)active-admin命令行管理工具

activemq-adminstart(启动localhost)activemq-adminstartxbean:file:E:/apache-activemq-5.7.0/aaa/conf/activemq.xml2、createCreatesarunnablebrokerinstanceinthespecifiedpath创建一个可以运行的代理实例,在指定的路径。做用是创建后,可以起多个实例,每个实例有自己的Queues和Topics,但是实例共用kahadb数据库。例子:activemq-admincreateE:\apache-activemq-5.7.0\aaa3、stopStopsarunningbrokerspecifiedbythebrokername.停止一个指定代理名称的代理。brokername配置在activemq.xml中。例子:activemq-adminstopaaa4、listListsallavailablebr

消息系统 2017-11-12 发布
15 浏览

Java for Web学习笔记(九二):消息和集群(7)RabbitMQ和消息模式(上)

学习目的RabbitMQ是AMQP的一个实现。我们将跟随rabbitMQ官网上的例子,了解AMQP所支持的不同消息队列模式,以及代码如何实现。安装RabbitMQServerUbuntu自带的RabbitMQServer,安装如下sudoapt-getinstallrabbitmq-serversudoservicerabbitmq-serverstartsudorabbitmq-pluginsenablerabbitmq_management开启管理websudoservicerabbitmq-serverrestartsudorabbitmqctladd_usertest123456添加用户test,密码为123456sudorabbitmqctlset_user_tagstesetadministrator将test为设置为administratorsudorabbitmqctlset_permissions-p/test".*"".*"".*"设置权限我们打开ht

消息系统 2017-11-12 发布
AD 友情赞助
15 浏览

Java for Web学习笔记(九三):消息和集群(8)RabbitMQ和消息模式(中)

WorkQueues:竞争消费模式(competingconsumerspattern)简单模式,也可以用于多个client(subsriber)去获取消息,和简单模式对比:我们希望数据能够真正被client处理完,client处理好后应发送Basic.ACK消息给server,如果client因为某个原因未能处理好,当channel关闭,connection关闭或中断的时候,这个消息应该能够分配给其他的client进行处理。我们希望一次只获取一个消息(或者若干),而不是将队列中所有消息都取走,这样才能更好地进行负载均衡。我们希望在server重启后,未处理的消息仍然在队列中。我们还将学习添加AMQP的header。发布的代码,忽略了channel和connection的关闭。可参见[1]ConnectionFactoryfactory=newConnectionFactory();factory.setHost("191.8.1.107");factory.setUsername("test");factory.setP

消息系统 2017-11-12 发布
16 浏览

Java for Web学习笔记(九四):消息和集群(9)RabbitMQ和消息模式(下)

例子:RPC的实现这是一个通过AMQP使用RPC的例子,RPC是个同步的处理,需要等待响应。在实际应用需要特别消息,server可能性能很慢,server可能关闭。我们是否一定要使用RPC,是否可以用异步的方式替代。在此,我们只是演示如何利用RabbitMQ提供的消息管理来实现一个RPC。思路如下:RPCServer从一个队列rpc_queue中接收消息,RPCclient将请求消息发往rpc_queue队列。消息的Properties中Correlation-Id带有这个请求的UUID,用于和响应对应。消息的Properties中Reply-to带有client希望从那个队列中接收这个响应。RPC的调用是同步的,通过BlockingQueue来实现这一个过程。小例子将简单模拟Fibonacci函数RPCServer的代码publicclassFibonacciRPCServer{privatestaticfinalLoggerlog=LogManager.getLogger();privatestatic

消息系统 2017-11-12 发布
21 浏览

Kafka系列(三)Kafka生产者:写消息到Kafka

本系列文章为对《Kafka:TheDefinitiveGuide》的学习整理,希望能够帮助到大家本章我们将会讨论Kafka生产者是如何发送消息到Kafka的。Kafka项目有一个生产者客户端,我们可以通过这个客户端的API来发送消息。生产者客户端是用Java写的,但Kafka写消息的协议是开放的,所以我们也可以自己实现一个非Java语言的客户端。开源的非Java语言客户端见这个wiki。概要当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过Kafka返回。对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户

消息系统 2017-11-12 发布
AD 友情赞助
16 浏览

【Kafka源码】broker被选为controller之后的连锁反应

[TOC]今天我们主要分析下broker被选为controller之后,主要干了什么。门面代码先列出来:defonControllerFailover(){if(isRunning){info("Broker%dstartingbecomecontrollerstatetransition".format(config.brokerId))//readcontrollerepochfromzkreadControllerEpochFromZookeeper()//incrementthecontrollerepochincrementControllerEpoch(zkUtils.zkClient)//beforereadingsourceoftruthfromzookeeper,registerthelistenerstogetbroker/topiccallbacksregisterReassignedPartitionsListener()registerIsrChan

消息系统 2017-11-13 发布
17 浏览

解决celery中用户密码带有特殊字符的问题

由于某些原因,我们会在密码中加入一些特殊字符,如标点符号等,来加强密码的强度,当在rabbitmq中使用这类包含问号等特殊字符的密码时,celery却遇到了处理错误的问题。我们编写一个broker的url如下,我们需要连接一个rabbitmq服务器,使用的用户名为admin,密码为mypass?1234:amqp://admin:mypass?1234@test.knktc.com//把这个URL用于以下的代码中:fromceleryimportCelery#configRMQ_URL='amqp://admin:mypass?1234@test.knktc.com//'app=Celery('tasks',broker=RMQ_URL)@app.taskdefadd(x,y):returnx+y一运行就会报以下的错误:ValueError:invalidliteralforint()withbase10:'mypass'原因就是因为密码中带有问号,导致了程序出错。此时解决的方案就是

消息系统 2017-11-13 发布
18 浏览

Celery 源码解析三: Task 对象的实现

Task的实现在Celery中你会发现有两处,一处位于celery/app/task.py,这是第一个;第二个位于celery/task/base.py中,这是第二个。他们之间是有关系的,你可以认为第一个是对外暴露的接口,而第二个是具体的实现!所以,我们由简入繁,先来看看对外的接口:其实这就是个我们声明Task的对象,例如我们使用这么一段代码:我们可以看看add对象是啥:In[1]:addOut[1]:<@task:worker.addoftasks:0x10c9b06d0>你会发现其实他就是我们的一个Task对象,所以你就可以观察一下我们平时使用这个add的形式在里面是如何实现的了,例如我们最常使用的可能就两种方式了,分别是:In[2]:add.delay()In[3]:add.apply_async()其他你看一下源码就会发现他们的实现是一样的,就像这样:我们现在很清楚,调用apply_async是将我们的Task提交到MQ中,然后获得一个c

消息系统 2017-11-13 发布
AD 友情赞助
16 浏览

Kafka构建和部署机器学习

使用ApacheKafka在生产环境构建大规模机器学习智能实时应用为所有行业带来了革命性变化。机器学习及其分支深度学习正蓬勃发展,因为机器学习让计算机能够在无人指引的情况下挖掘深藏的洞见。这种能力正是多种领域所需要的,如非结构化数据分析、图像识别、语音识别和智能决策,这完全不同于传统的编程方式(如Java、.NET或Python)。机器学习并非新生事物,大数据集的出现和处理能力的进步让每一个企业都具备了构建分析模型的能力。各行各业都在将分析模型应用在企业应用和微服务上,用以增长利润、降低成本,或者改善用户体验。这篇文章将介绍机器学习在任务关键型实时系统中的应用,将ApacheKafka作为中心化的、可伸缩的任务关键型系统,同时还将介绍使用KafkaStreamsAPI来构建智能流式应用。可伸缩的任务关键型实时系统互联网、智能手机和持续在线思维的出现改变了人们的行为方式。其中就包括人们对与设备、产品和服务交互方式的期待:人们希望能够实时地获得信息。这也给企业带来了巨大挑战:如何快速地采取行动才能把握先机。批处理系统已经无

消息系统 2017-11-14 发布
10 浏览

Kafka系列(四)Kafka消费者:从Kafka中读取数据

本系列文章为对《Kafka:TheDefinitiveGuide》的学习整理,希望能够帮助到大家应用从Kafka中读取数据需要使用KafkaConsumer订阅主题,然后接收这些主题的消息。在我们深入这些API之前,先来看下几个比较重要的概念。Kafka消费者相关的概念消费者与消费组假设这么个场景:我们从Kafka中读取消息,并且进行检查,最后产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区;同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:如果我们增加新的消费者C2到消费组G1,那么每个消费者将会分别收到两个分区的消息,如下所示:如果增加到4个消费者,那么每个消费

消息系统 2017-11-14 发布
13 浏览

Celery 源码解析四: 定时任务的实现

在系列中的第二篇我们已经看过了Celery中的执行引擎是如何执行任务的,并且在第三篇中也介绍了任务的对象,但是,目前我们看到的都是被动的任务执行,也就是说目前执行的任务都是第三方调用发送过来的。可能你会有点奇怪,难道除了第三方调用发送,还有其他的调用发送方?是的,Celery自身也会发送任务,在本文中,你将看到Celery如何利用自身的定时机制运行我们设置得定时任务,并且交给Worker执行。定时任务的定义在开始讲解源码之前,我们不妨先看下我们平常都是怎么定义定时任务的,还是以我们习惯的Demo为例:定义就是这么简单,这么随意,但是,想要执行却是需要我们运行一个定时器,也就是在命令行中启动Beater,正常情况下你这么做就可以了:然后你就会看到一个个的定时任务被发送到MQ中,然后被worker消化。定时任务的启动上面只是举了个如何使用的例子,但是,在Celery内部是如何处理这些任务才是我们需要关心的真正的点。回想一下在我们第一篇中讲Worker的启动流程的文章,有一个很重要

消息系统 2017-11-14 发布
10 浏览

RabbitMQ—Android客户端

由于后台缘故,这里不需要exchange。想要看完整的流程的同学可以去看这篇文章。注意要在子线程中运行ConnectionFactoryfactory=newConnectionFactory();factory.setHost(HOST);//ip地址factory.setPort(PORT);//端口号factory.setUsername("test");factory.setPassword("test");//创建一个新连接Connectionconnection=factory.newConnection();//创建一个通道Channelchannel=connection.createChannel();//声明一个队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//发送消息channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));//消息

消息系统 2017-11-15 发布
16 浏览

Spring Cloud Bus之RabbitMQ初窥

和SpringCloudConfig一样,我们接下来要聊的SpringCloudBus也是微服务架构系统中的必备组件。SpringCloudBus可以将分布式系统的节点与轻量级消息代理链接,然后可以实现广播状态更改(例如配置更改)或广播其他管理指令。SpringCloudBus就像一个分布式执行器,用于扩展的SpringBoot应用程序,但也可以用作应用程序之间的通信通道。那么这里就涉及到了消息代理,目前流行的消息代理中间件有不少,SpringCloudBus支持RabbitMQ和Kafka,本文我们主要来看看RabbitMQ的基本使用。本文是SpringCloud系列的第二十六篇文章,了解前二十五篇文章内容有助于更好的理解本文:1.使用SpringCloud搭建服务注册中心2.使用SpringCloud搭建高可用服务注册中心3.SpringCloud中服务的发现与消费4.Eureka中的核心概念5.什么是客户端负载均衡6.SpringRestTemplate中几种常见的请求

消息系统 2017-11-15 发布
13 浏览

OpenSOC深入剖析

什么是OpenSOC思科在BroCON大会上亮相了其安全大数据分析架构OpenSOC(由Cisco和Hortonworks共同开发),引起了广泛关注。OpenSOC是一个针对网络包和流的大数据分析框架,它是大数据分析与安全分析技术的结合,能够实时的检测网络异常情况并且可以扩展很多节点,它的存储使用开源项目Hadoop,实时索引使用开源项目ElasticSearch,在线流分析使用著名的开源项目Storm。OpenSOC是大数据安全分析的框架设计,对数据中心机排放数据进行消费和监控网络流量。opensoc是可扩展的,目的是在一个大规模的集群上工作。OpenSOC能做什么?可扩展的接收器和分析器能够监视任何Telemetry数据源是一个扩展性很强的框架,且支持各种Telemetry数据流支持对Telemetry数据流的异常检测和基于规则实时告警通过预设时间使用Hadoop存储Telemetry的数据流支持使用ElasticSearch实现自动化实时索引Telemetry数据流支持使用Hive利用SQL查询存储在Hadoop中的数据能够兼

消息系统 2017-11-15 发布
10 浏览

【Kafka源码】处理请求

[TOC]在KafkaServer中的入口在:apis=newKafkaApis(socketServer.requestChannel,replicaManager,groupCoordinator,kafkaController,zkUtils,config.brokerId,config,metadataCache,metrics,authorizer)requestHandlerPool=newKafkaRequestHandlerPool(config.brokerId,socketServer.requestChannel,apis,config.numIoThreads)首先根据相关参数,实例化KafkaApis,然后实例化KafkaRequestHandlerPool。下面我们首先看下KafkaRequestHandlerPool。一、KafkaRequestHandlerPoolclassKafkaRequestHandlerPool(valbrokerId:Int,valr

消息系统 2017-11-15 发布
23 浏览

Android的消息机制@松鼠笔记

前言Android的消息机制主要是指Handler的运行机制。Handler的运行需要底层的MessageQueue消息队列和Looper消息循环的支撑。简单来说,Android的消息机制就是:Handler给MessageQueue添加消息,然后Looper无限循环读取消息,再调用Handler处理消息。下面将从细节方面进行详细描述:为什么会有消息机制消息机制概述消息机制源码分析一、为什么会有消息机制在Android的UI主线程中,不能执行超出5秒的耗时任务,并且所有View和ViewGroup都只能在UI主线程中运行。如果View或者ViewGroup在工作线程中运行,将会抛出【Onlytheoriginalthreadthatcreatedaviewhierarchycantouchitsviews】异常,所以在android中会通过消息机制来解决线程和线程之间的通信问题。二、消息机制概述消息机制由以下四个部分组成:Message消息(数据载体)Handler消息处理器发送消息处理消息Me

消息系统 2017-11-15 发布
AD 友情赞助