最近工作需要了解WebWorker,根据RTFSC原则,空下来看一下Chrome的SharedWorker源码。
ShareWorker是共用Worker,Chrome的实现调用new SharedWorker会分配独立进程,不管调用多少次都只有这一个实例。除了进程管理,SharedWorker还需要通信,Chrome中SharedWorker通过MessagePort通信。
SharedWorker.cpp
// 构造函数, inline SharedWorker::SharedWorker(ExecutionContext* context) // 初始化父类AbstrctWorker, // AbstractWorker继承自ActiveDOMObject,因为要在JS中使用,所以Worker需要是一个DOM Object。暂时不关注ActiveDOMObject的实现。 // AbstractWorker有一个方法,resolveURL,对传进来的url进行有效性和安全性检查。 : AbstractWorker(context) //初始化连接标志m_isBeingConnected。 , m_isBeingConnected(false) { } //接下来是SharedWorker真正创建的函数,create: PassRefPtrWillBeRawPtr<SharedWorker> SharedWorker::create(ExecutionContext* context, const String& url, const String& name, ExceptionState& exceptionState) { // SharedWorker是独立进程的 ASSERT(isMainThread()); // 因为目前还不支持worker与worker之间通信,context必须是JS中的document ASSERT_WITH_SECURITY_IMPLICATION(context->isDocument()); // 引用计数,SharedWorker会被多次引用,所以需要引用计数,以便最后一个引用退出时析构。 UseCounter::count(context, UseCounter::SharedWorkerStart); // 使用构造函数构造一个worker实例。 RefPtrWillBeRawPtr<SharedWorker> worker = adoptRefWillBeNoop(new SharedWorker(context));// SharedWorker的构造函数并没有做太多的事情,初始化父类AbstractWorker,以及一个标志,是否连接 // 通信管道,MessageChannel放到后面看。 MessageChannel* channel = MessageChannel::create(context); // port是从channel中拿到的,管道中用来通信的 worker->m_port = channel->port1(); OwnPtr<WebMessagePortChannel> remotePort = channel->port2()->disentangle(); ASSERT(remotePort); // 这个看名字就知道什么意思了 worker->suspendIfNeeded(); // 这里chrome已经注释了 // We don't currently support nested workers, so workers can only be created from documents. Document* document = toDocument(context); // 判断下能不能连上 if (!document->securityOrigin()->canAccessSharedWorkers()) { exceptionState.throwSecurityError("Access to shared workers is denied to origin '" + document->securityOrigin()->toString() + "'."); return nullptr; } // 判断url的安全性 KURL scriptURL = worker->resolveURL(url, exceptionState); if (scriptURL.isEmpty()) return nullptr; // 创建document与worker的连接 if (document->frame()->loader().client()->sharedWorkerRepositoryClient()) document->frame()->loader().client()->sharedWorkerRepositoryClient()->connect(worker.get(), remotePort.release(), scriptURL, name, exceptionState); // 返回worker的实例 return worker.release(); } SharedWorker::~SharedWorker() { } const AtomicString& SharedWorker::interfaceName() const { return EventTargetNames::SharedWorker; } bool SharedWorker::hasPendingActivity() const { return m_isBeingConnected; }
其实SharedWorker挺简单的,进程管理,通信,下面看通信是如何实现的(急需恶补一番底层通信知识,预定下周把底层通信手段学习一遍)。
static void createChannel(MessagePort* port1, MessagePort* port2) { // 创建连接 WebMessagePortChannel* channel1; WebMessagePortChannel* channel2; Platform::current()->createMessageChannel(&channel1, &channel2); ASSERT(channel1 && channel2); // Now entangle the proxies with the appropriate local ports. port1->entangle(adoptPtr(channel2)); port2->entangle(adoptPtr(channel1)); } MessageChannel::MessageChannel(ExecutionContext* context) // MessageChannel里面就是搞两个port : m_port1(MessagePort::create(*context)) , m_port2(MessagePort::create(*context)) { createChannel(m_port1.get(), m_port2.get()); }
好吧,这个类也不怎么干活啊,活在WebMessagePortChannel和MessagePort里面做。
MessagePort
// 如果这里是管道通信的话,那可能需要两组channel worker端一组,web端一组 // 这是一个干活的类,create里面做的事情不多,构造一下,返回 PassRefPtrWillBeRawPtr<MessagePort> MessagePort::create(ExecutionContext& executionContext) { RefPtrWillBeRawPtr<MessagePort> port = adoptRefWillBeNoop(new MessagePort(executionContext)); port->suspendIfNeeded(); return port.release(); } //构造函数做的也不多,创建一个跟document关联的DOM Object,初始化几个标志参数 MessagePort::MessagePort(ExecutionContext& executionContext) : ActiveDOMObject(&executionContext) , m_started(false) , m_closed(false) , m_weakFactory(this) { } MessagePort::~MessagePort() { close(); if (m_scriptStateForConversion) m_scriptStateForConversion->disposePerContextData(); } // 发消息 void MessagePort::postMessage(ExecutionContext* context, PassRefPtr<SerializedScriptValue> message, const MessagePortArray* ports, ExceptionState& exceptionState) { // entangle,好吧,英语比较渣,google翻译下,是缠的意思,Orz if (!isEntangled()) return; ASSERT(executionContext()); ASSERT(m_entangledChannel); // 一个channel数组 OwnPtr<MessagePortChannelArray> channels; // Make sure we aren't connected to any of the passed-in ports. // 防错代码 if (ports) { for (unsigned i = 0; i < ports->size(); ++i) { MessagePort* dataPort = (*ports)[i].get(); if (dataPort == this) { exceptionState.throwDOMException(DataCloneError, "Port at index " + String::number(i) + " contains the source port."); return; } } // 解绑 channels = MessagePort::disentanglePorts(context, ports, exceptionState); if (exceptionState.hadException()) return; } // 给channel发消息 WebString messageString = message->toWireString(); OwnPtr<WebMessagePortChannelArray> webChannels = toWebMessagePortChannelArray(channels.release()); // 待看 WebMessagePortChannelArray m_entangledChannel->postMessage(messageString, webChannels.leakPtr()); } // static 两组管道互相取 PassOwnPtr<WebMessagePortChannelArray> MessagePort::toWebMessagePortChannelArray(PassOwnPtr<MessagePortChannelArray> channels) { OwnPtr<WebMessagePortChannelArray> webChannels; if (channels && channels->size()) { webChannels = adoptPtr(new WebMessagePortChannelArray(channels->size())); for (size_t i = 0; i < channels->size(); ++i) (*webChannels)[i] = (*channels)[i].leakPtr(); } return webChannels.release(); } // static PassOwnPtrWillBeRawPtr<MessagePortArray> MessagePort::toMessagePortArray(ExecutionContext* context, const WebMessagePortChannelArray& webChannels) { OwnPtrWillBeRawPtr<MessagePortArray> ports = nullptr; if (!webChannels.isEmpty()) { OwnPtr<MessagePortChannelArray> channels = adoptPtr(new MessagePortChannelArray(webChannels.size())); for (size_t i = 0; i < webChannels.size(); ++i) (*channels)[i] = adoptPtr(webChannels[i]); ports = MessagePort::entanglePorts(*context, channels.release()); } return ports.release(); } // 函数字面意思是,断开缠绕关系 PassOwnPtr<WebMessagePortChannel> MessagePort::disentangle() { ASSERT(m_entangledChannel); // 将端口重置为0 m_entangledChannel->setClient(0); return m_entangledChannel.release(); } // Invoked to notify us that there are messages available for this port. // This code may be called from another thread, and so should not call any non-threadsafe APIs (i.e. should not call into the entangled channel or access mutable variables). // 通知port已经可用了,即start过了 void MessagePort::messageAvailable() { ASSERT(executionContext()); executionContext()->postTask(FROM_HERE, createCrossThreadTask(&MessagePort::dispatchMessages, m_weakFactory.createWeakPtr())); } // 类似于生命周期,start了通知一声 void MessagePort::start() { // Do nothing if we've been cloned or closed. if (!isEntangled()) return; ASSERT(executionContext()); if (m_started) return; m_started = true; messageAvailable(); } // 关掉缠绕关系 void MessagePort::close() { if (isEntangled()) m_entangledChannel->setClient(0); m_closed = true; } // 与远程channel缠绕 void MessagePort::entangle(PassOwnPtr<WebMessagePortChannel> remote) { // Only invoked to set our initial entanglement. ASSERT(!m_entangledChannel); ASSERT(executionContext()); m_entangledChannel = remote; m_entangledChannel->setClient(this); } const AtomicString& MessagePort::interfaceName() const { return EventTargetNames::MessagePort; } // 尝试从webChannel取message static bool tryGetMessageFrom(WebMessagePortChannel& webChannel, RefPtr<SerializedScriptValue>& message, OwnPtr<MessagePortChannelArray>& channels) { WebString messageString; WebMessagePortChannelArray webChannels; if (!webChannel.tryGetMessage(&messageString, webChannels)) return false; if (webChannels.size()) { channels = adoptPtr(new MessagePortChannelArray(webChannels.size())); for (size_t i = 0; i < webChannels.size(); ++i) (*channels)[i] = adoptPtr(webChannels[i]); } message = SerializedScriptValueFactory::instance().createFromWire(messageString); return true; } bool MessagePort::tryGetMessage(RefPtr<SerializedScriptValue>& message, OwnPtr<MessagePortChannelArray>& channels) { if (!m_entangledChannel) return false; return tryGetMessageFrom(*m_entangledChannel, message, channels); } // 分发消息 void MessagePort::dispatchMessages() { // Because close() doesn't cancel any in flight calls to dispatchMessages() we need to check if the port is still open before dispatch. if (m_closed) return; // Messages for contexts that are not fully active get dispatched too, but JSAbstractEventListener::handleEvent() doesn't call handlers for these. // The HTML5 spec specifies that any messages sent to a document that is not fully active should be dropped, so this behavior is OK. if (!started()) return; RefPtr<SerializedScriptValue> message; OwnPtr<MessagePortChannelArray> channels; // 从管道中拿到message while (tryGetMessage(message, channels)) { // close() in Worker onmessage handler should prevent next message from dispatching. if (executionContext()->isWorkerGlobalScope() && toWorkerGlobalScope(executionContext())->isClosing()) return; // 这里有一次绑定端口 OwnPtrWillBeRawPtr<MessagePortArray> ports = MessagePort::entanglePorts(*executionContext(), channels.release()); RefPtrWillBeRawPtr<Event> evt = MessageEvent::create(ports.release(), message.release()); dispatchEvent(evt.release(), ASSERT_NO_EXCEPTION); } } bool MessagePort::hasPendingActivity() const { // The spec says that entangled message ports should always be treated as if they have a strong reference. // We'll also stipulate that the queue needs to be open (if the app drops its reference to the port before start()-ing it, then it's not really entangled as it's unreachable). return m_started && isEntangled(); } // 解除port entangle关系? PassOwnPtr<MessagePortChannelArray> MessagePort::disentanglePorts(ExecutionContext* context, const MessagePortArray* ports, ExceptionState& exceptionState) { if (!ports || !ports->size()) return nullptr; // HashSet used to efficiently check for duplicates in the passed-in array. HashSet<MessagePort*> portSet; // Walk the incoming array - if there are any duplicate ports, or null ports or cloned ports, throw an error (per section 8.3.3 of the HTML5 spec). for (unsigned i = 0; i < ports->size(); ++i) { MessagePort* port = (*ports)[i].get(); if (!port || port->isNeutered() || portSet.contains(port)) { String type; if (!port) type = "null"; else if (port->isNeutered()) type = "already neutered"; else type = "a duplicate"; exceptionState.throwDOMException(DataCloneError, "Port at index " + String::number(i) + " is " + type + "."); return nullptr; } portSet.add(port); } UseCounter::count(context, UseCounter::MessagePortsTransferred); // Passed-in ports passed validity checks, so we can disentangle them. // 每个port disentangle OwnPtr<MessagePortChannelArray> portArray = adoptPtr(new MessagePortChannelArray(ports->size())); for (unsigned i = 0; i < ports->size(); ++i) (*portArray)[i] = (*ports)[i]->disentangle(); return portArray.release(); } // 绑定所有ports PassOwnPtrWillBeRawPtr<MessagePortArray> MessagePort::entanglePorts(ExecutionContext& context, PassOwnPtr<MessagePortChannelArray> channels) { // https://html.spec.whatwg.org/multipage/comms.html#message-ports // |ports| should be an empty array, not null even when there is no ports. if (!channels || !channels->size()) return adoptPtrWillBeNoop(new MessagePortArray()); OwnPtrWillBeRawPtr<MessagePortArray> portArray = adoptPtrWillBeNoop(new MessagePortArray(channels->size())); for (unsigned i = 0; i < channels->size(); ++i) { RefPtrWillBeRawPtr<MessagePort> port = MessagePort::create(context); port->entangle((*channels)[i].release()); (*portArray)[i] = port.release(); } return portArray.release(); }
主要就是entangle函数里面,会做一次remote.setClient(this),然后dispatchEvent就很方便了。疑问就是 为什么postMessage会做一次disentangle
就是port看的晕晕的,还是去理解一下底层通信吧.