RabbitMQ 发送消息步骤 & 源码
发布于: 2020 年 05 月 26 日
RabbitTemplate
Helper class that simplifies synchronous RabbitMQ access (sending and receiving messages).
RabbitTemplate API
RabbitTemplate.convertAndSend(String exchange, String routingKey, Object message)
/** * Convert a Java object to an Amqp {@link Message} and send it to a specific exchange * with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @throws AmqpException if there is a problem */ void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
/** * Send a message to a specific exchange with a specific routing key. * * @param exchange the name of the exchange * @param routingKey the routing key * @param message a message to send * @param correlationData data to correlate publisher confirms. * @throws AmqpException if there is a problem */ void send(String exchange, String routingKey, Message message, CorrelationData correlationData) throws AmqpException;
/** * Send the given message to the specified exchange. * * @param channel The RabbitMQ Channel to operate within. * @param exchangeArg The name of the RabbitMQ exchange to send to. * @param routingKeyArg The routing key. * @param message The Message to send. * @param mandatory The mandatory flag. * @param correlationData The correlation data. * @throws IOException If thrown by RabbitMQ API methods. */ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity boolean mandatory, @Nullable CorrelationData correlationData) throws IOException { String exch = exchangeArg; String rKey = routingKeyArg; if (exch == null) { exch = this.exchange; } if (rKey == null) { rKey = this.routingKey; } if (logger.isTraceEnabled()) { logger.trace("Original message to publish: " + message); } Message messageToUse = message; MessageProperties messageProperties = messageToUse.getMessageProperties(); if (mandatory) { messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid); } if (this.beforePublishPostProcessors != null) { for (MessagePostProcessor processor : this.beforePublishPostProcessors) { messageToUse = processor.postProcessMessage(messageToUse, correlationData); } } setupConfirm(channel, messageToUse, correlationData); if (this.userIdExpression != null && messageProperties.getUserId() == null) { String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class); if (userId != null) { messageProperties.setUserId(userId); } } if (logger.isDebugEnabled()) { logger.debug("Publishing message [" + messageToUse + "] on exchange [" + exch + "], routingKey = [" + rKey + "]"); } sendToRabbit(channel, exch, rKey, mandatory, messageToUse); // Check if commit needed if (isChannelLocallyTransacted(channel)) { // Transacted channel created by this template -> commit. RabbitUtils.commitIfNecessary(channel); } }
/** * Publish a message. * * Invocations of <code>Channel#basicPublish</code> will eventually block if a * <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. * * @see com.rabbitmq.client.AMQP.Basic.Publish * @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param mandatory true if the 'mandatory' flag is to be set * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
/** Public API - {@inheritDoc} */ @Override public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException { if (nextPublishSeqNo > 0) { unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } if (props == null) { props = MessageProperties.MINIMAL_BASIC; } AMQCommand command = new AMQCommand( new Basic.Publish.Builder() .exchange(exchange) .routingKey(routingKey) .mandatory(mandatory) .immediate(immediate) .build(), props, body); try { transmit(command); } catch (IOException e) { metricsCollector.basicPublishFailure(this, e); throw e; } metricsCollector.basicPublish(this); }
public void transmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { ensureIsOpen(); quiescingTransmit(c); } }
/** * Sends this command down the named channel on the channel's * connection, possibly in multiple frames. * @param channel the channel on which to transmit the command * @throws IOException if an error is encountered */ public void transmit(AMQChannel channel) throws IOException { int channelNumber = channel.getChannelNumber(); AMQConnection connection = channel.getConnection(); synchronized (assembler) { Method m = this.assembler.getMethod(); if (m.hasContent()) { byte[] body = this.assembler.getContentBody(); Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length); int frameMax = connection.getFrameMax(); boolean cappedFrameMax = frameMax > 0; int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length; if (cappedFrameMax && headerFrame.size() > frameMax) { String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax); throw new IllegalArgumentException(msg); } connection.writeFrame(m.toFrame(channelNumber)); connection.writeFrame(headerFrame); for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { int remaining = body.length - offset; int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength); connection.writeFrame(frame); } } else { connection.writeFrame(m.toFrame(channelNumber)); } } connection.flush(); }
/** * Public API - sends a frame directly to the broker. */ public void writeFrame(Frame f) throws IOException { _frameHandler.writeFrame(f); _heartbeatSender.signalActivity(); }
Socket to send AMQP frame
@Override public void writeFrame(Frame frame) throws IOException { synchronized (_outputStream) { frame.writeTo(_outputStream); } }
/** * Public API - writes this Frame to the given DataOutputStream */ public void writeTo(DataOutputStream os) throws IOException { os.writeByte(type); os.writeShort(channel); if (accumulator != null) { os.writeInt(accumulator.size()); accumulator.writeTo(os); } else { os.writeInt(payload.length); os.write(payload); } os.write(AMQP.FRAME_END); }
划线
评论
复制
发布于: 2020 年 05 月 26 日阅读数: 62
云淡风轻
关注
云淡风轻 2018.08.18 加入
JAVA软件工程师
评论