RabbitMQ 发送消息步骤 & 源码

用户头像
云淡风轻
关注
发布于: 2020 年 05 月 26 日

RabbitTemplate

Helper class that simplifies synchronous RabbitMQ access (sending and receiving messages).

RabbitTemplate API

RabbitTemplate.convertAndSend(String exchange, String routingKey, Object message)



/**
* Convert a Java object to an Amqp {@link Message} and send it to a specific exchange
* with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;



/**
* Send a message to a specific exchange with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @param correlationData data to correlate publisher confirms.
* @throws AmqpException if there is a problem
*/
void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
throws AmqpException;



/**
* Send the given message to the specified exchange.
*
* @param channel The RabbitMQ Channel to operate within.
* @param exchangeArg The name of the RabbitMQ exchange to send to.
* @param routingKeyArg The routing key.
* @param message The Message to send.
* @param mandatory The mandatory flag.
* @param correlationData The correlation data.
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
String exch = exchangeArg;
String rKey = routingKeyArg;
if (exch == null) {
exch = this.exchange;
}
if (rKey == null) {
rKey = this.routingKey;
}
if (logger.isTraceEnabled()) {
logger.trace("Original message to publish: " + message);
}
Message messageToUse = message;
MessageProperties messageProperties = messageToUse.getMessageProperties();
if (mandatory) {
messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
}
if (this.beforePublishPostProcessors != null) {
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor.postProcessMessage(messageToUse, correlationData);
}
}
setupConfirm(channel, messageToUse, correlationData);
if (this.userIdExpression != null && messageProperties.getUserId() == null) {
String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
if (userId != null) {
messageProperties.setUserId(userId);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}



/**
* Publish a message.
*
* Invocations of <code>Channel#basicPublish</code> will eventually block if a
* <a href="https://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
*
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @see <a href="https://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param mandatory true if the 'mandatory' flag is to be set
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;



/** Public API - {@inheritDoc} */
@Override
public void basicPublish(String exchange, String routingKey,
boolean mandatory, boolean immediate,
BasicProperties props, byte[] body)
throws IOException
{
if (nextPublishSeqNo > 0) {
unconfirmedSet.add(getNextPublishSeqNo());
nextPublishSeqNo++;
}
if (props == null) {
props = MessageProperties.MINIMAL_BASIC;
}
AMQCommand command = new AMQCommand(
new Basic.Publish.Builder()
.exchange(exchange)
.routingKey(routingKey)
.mandatory(mandatory)
.immediate(immediate)
.build(), props, body);
try {
transmit(command);
} catch (IOException e) {
metricsCollector.basicPublishFailure(this, e);
throw e;
}
metricsCollector.basicPublish(this);
}



public void transmit(AMQCommand c) throws IOException {
synchronized (_channelMutex) {
ensureIsOpen();
quiescingTransmit(c);
}
}



/**
* Sends this command down the named channel on the channel's
* connection, possibly in multiple frames.
* @param channel the channel on which to transmit the command
* @throws IOException if an error is encountered
*/
public void transmit(AMQChannel channel) throws IOException {
int channelNumber = channel.getChannelNumber();
AMQConnection connection = channel.getConnection();
synchronized (assembler) {
Method m = this.assembler.getMethod();
if (m.hasContent()) {
byte[] body = this.assembler.getContentBody();
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
int frameMax = connection.getFrameMax();
boolean cappedFrameMax = frameMax > 0;
int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
if (cappedFrameMax && headerFrame.size() > frameMax) {
String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
throw new IllegalArgumentException(msg);
}
connection.writeFrame(m.toFrame(channelNumber));
connection.writeFrame(headerFrame);
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
int remaining = body.length - offset;
int fragmentLength = (remaining < bodyPayloadMax) ? remaining
: bodyPayloadMax;
Frame frame = Frame.fromBodyFragment(channelNumber, body,
offset, fragmentLength);
connection.writeFrame(frame);
}
} else {
connection.writeFrame(m.toFrame(channelNumber));
}
}
connection.flush();
}



/**
* Public API - sends a frame directly to the broker.
*/
public void writeFrame(Frame f) throws IOException {
_frameHandler.writeFrame(f);
_heartbeatSender.signalActivity();
}

Socket to send AMQP frame

@Override
public void writeFrame(Frame frame) throws IOException {
synchronized (_outputStream) {
frame.writeTo(_outputStream);
}
}



/**
* Public API - writes this Frame to the given DataOutputStream
*/
public void writeTo(DataOutputStream os) throws IOException {
os.writeByte(type);
os.writeShort(channel);
if (accumulator != null) {
os.writeInt(accumulator.size());
accumulator.writeTo(os);
} else {
os.writeInt(payload.length);
os.write(payload);
}
os.write(AMQP.FRAME_END);
}



用户头像

云淡风轻

关注

云淡风轻 2018.08.18 加入

JAVA软件工程师

评论

发布
暂无评论
RabbitMQ发送消息步骤&源码